diff --git a/glide-core/redis-rs/redis/Cargo.toml b/glide-core/redis-rs/redis/Cargo.toml index fd79ff079e..46f6fe9231 100644 --- a/glide-core/redis-rs/redis/Cargo.toml +++ b/glide-core/redis-rs/redis/Cargo.toml @@ -66,13 +66,9 @@ derivative = { version = "2.2.0", optional = true } # Only needed for async cluster dashmap = { version = "6.0", optional = true } -# Only needed for async_std support -async-std = { version = "1.8.0", optional = true } async-trait = { version = "0.1.24", optional = true } -# To avoid conflicts, backoff-std-async.version != backoff-tokio.version so we could run tests with --all-features -backoff-std-async = { package = "backoff", version = "0.3.0", optional = true, features = ["async-std"] } -# Only needed for tokio support +# Only needed for tokio support backoff-tokio = { package = "backoff", version = "0.4.0", optional = true, features = ["tokio"] } # Only needed for native tls @@ -108,7 +104,18 @@ arcstr = "1.1.5" uuid = { version = "1.6.1", optional = true } [features] -default = ["acl", "streams", "geospatial", "script", "keep-alive"] +default = [ + "acl", + "streams", + "geospatial", + "script", + "keep-alive", + "tokio-comp", + "tokio-rustls-comp", + "connection-manager", + "cluster", + "cluster-async" +] acl = [] aio = ["bytes", "pin-project-lite", "futures-util", "futures-util/alloc", "futures-util/sink", "tokio/io-util", "tokio-util", "tokio-util/codec", "combine/tokio", "async-trait", "fast-math", "dispose"] geospatial = [] @@ -119,9 +126,6 @@ tls-native-tls = ["native-tls"] tls-rustls = ["rustls", "rustls-native-certs", "rustls-pemfile", "rustls-pki-types"] tls-rustls-insecure = ["tls-rustls"] tls-rustls-webpki-roots = ["tls-rustls", "webpki-roots"] -async-std-comp = ["aio", "async-std", "backoff-std-async"] -async-std-native-tls-comp = ["async-std-comp", "async-native-tls", "tls-native-tls"] -async-std-rustls-comp = ["async-std-comp", "futures-rustls", "tls-rustls"] tokio-comp = ["aio", "tokio/net", "backoff-tokio"] tokio-native-tls-comp = ["tokio-comp", "tls-native-tls", "tokio-native-tls"] tokio-rustls-comp = ["tokio-comp", "tls-rustls", "tokio-rustls"] @@ -139,7 +143,6 @@ disable-client-setinfo = [] # Deprecated features tls = ["tls-native-tls"] # use "tls-native-tls" instead -async-std-tls-comp = ["async-std-native-tls-comp"] # use "async-std-native-tls-comp" instead [dev-dependencies] rand = "0.8" @@ -156,15 +159,12 @@ tempfile = "=3.6.0" once_cell = "1" anyhow = "1" sscanf = "0.4.1" +serial_test = "2" [[test]] name = "test_async" required-features = ["tokio-comp"] -[[test]] -name = "test_async_async_std" -required-features = ["async-std-comp"] - [[test]] name = "parser" required-features = ["aio"] @@ -178,7 +178,7 @@ required-features = ["json", "serde/derive"] [[test]] name = "test_cluster_async" -required-features = ["cluster-async"] +required-features = ["cluster-async", "tokio-comp"] [[test]] name = "test_async_cluster_connections_logic" diff --git a/glide-core/redis-rs/redis/src/aio/async_std.rs b/glide-core/redis-rs/redis/src/aio/async_std.rs deleted file mode 100644 index 19c54d3b31..0000000000 --- a/glide-core/redis-rs/redis/src/aio/async_std.rs +++ /dev/null @@ -1,269 +0,0 @@ -#[cfg(unix)] -use std::path::Path; -#[cfg(feature = "tls-rustls")] -use std::sync::Arc; -use std::{ - future::Future, - io, - net::SocketAddr, - pin::Pin, - task::{self, Poll}, -}; - -use crate::aio::{AsyncStream, RedisRuntime}; -use crate::types::RedisResult; - -#[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))] -use async_native_tls::{TlsConnector, TlsStream}; - -#[cfg(feature = "tls-rustls")] -use crate::connection::create_rustls_config; -#[cfg(feature = "tls-rustls")] -use futures_rustls::{client::TlsStream, TlsConnector}; - -use async_std::net::TcpStream; -#[cfg(unix)] -use async_std::os::unix::net::UnixStream; -use async_trait::async_trait; -use futures_util::ready; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - -#[inline(always)] -async fn connect_tcp(addr: &SocketAddr) -> io::Result { - let socket = TcpStream::connect(addr).await?; - #[cfg(feature = "tcp_nodelay")] - socket.set_nodelay(true)?; - #[cfg(feature = "keep-alive")] - { - //For now rely on system defaults - const KEEP_ALIVE: socket2::TcpKeepalive = socket2::TcpKeepalive::new(); - //these are useless error that not going to happen - let mut std_socket = std::net::TcpStream::try_from(socket)?; - let socket2: socket2::Socket = std_socket.into(); - socket2.set_tcp_keepalive(&KEEP_ALIVE)?; - std_socket = socket2.into(); - Ok(std_socket.into()) - } - #[cfg(not(feature = "keep-alive"))] - { - Ok(socket) - } -} -#[cfg(feature = "tls-rustls")] -use crate::tls::TlsConnParams; - -#[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))] -use crate::connection::TlsConnParams; - -pin_project_lite::pin_project! { - /// Wraps the async_std `AsyncRead/AsyncWrite` in order to implement the required the tokio traits - /// for it - pub struct AsyncStdWrapped { #[pin] inner: T } -} - -impl AsyncStdWrapped { - pub(super) fn new(inner: T) -> Self { - Self { inner } - } -} - -impl AsyncWrite for AsyncStdWrapped -where - T: async_std::io::Write, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut core::task::Context, - buf: &[u8], - ) -> std::task::Poll> { - async_std::io::Write::poll_write(self.project().inner, cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut core::task::Context, - ) -> std::task::Poll> { - async_std::io::Write::poll_flush(self.project().inner, cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut core::task::Context, - ) -> std::task::Poll> { - async_std::io::Write::poll_close(self.project().inner, cx) - } -} - -impl AsyncRead for AsyncStdWrapped -where - T: async_std::io::Read, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut core::task::Context, - buf: &mut ReadBuf<'_>, - ) -> std::task::Poll> { - let n = ready!(async_std::io::Read::poll_read( - self.project().inner, - cx, - buf.initialize_unfilled() - ))?; - buf.advance(n); - std::task::Poll::Ready(Ok(())) - } -} - -/// Represents an AsyncStd connectable -pub enum AsyncStd { - /// Represents an Async_std TCP connection. - Tcp(AsyncStdWrapped), - /// Represents an Async_std TLS encrypted TCP connection. - #[cfg(any( - feature = "async-std-native-tls-comp", - feature = "async-std-rustls-comp" - ))] - TcpTls(AsyncStdWrapped>>), - /// Represents an Async_std Unix connection. - #[cfg(unix)] - Unix(AsyncStdWrapped), -} - -impl AsyncWrite for AsyncStd { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut task::Context, - buf: &[u8], - ) -> Poll> { - match &mut *self { - AsyncStd::Tcp(r) => Pin::new(r).poll_write(cx, buf), - #[cfg(any( - feature = "async-std-native-tls-comp", - feature = "async-std-rustls-comp" - ))] - AsyncStd::TcpTls(r) => Pin::new(r).poll_write(cx, buf), - #[cfg(unix)] - AsyncStd::Unix(r) => Pin::new(r).poll_write(cx, buf), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { - match &mut *self { - AsyncStd::Tcp(r) => Pin::new(r).poll_flush(cx), - #[cfg(any( - feature = "async-std-native-tls-comp", - feature = "async-std-rustls-comp" - ))] - AsyncStd::TcpTls(r) => Pin::new(r).poll_flush(cx), - #[cfg(unix)] - AsyncStd::Unix(r) => Pin::new(r).poll_flush(cx), - } - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { - match &mut *self { - AsyncStd::Tcp(r) => Pin::new(r).poll_shutdown(cx), - #[cfg(any( - feature = "async-std-native-tls-comp", - feature = "async-std-rustls-comp" - ))] - AsyncStd::TcpTls(r) => Pin::new(r).poll_shutdown(cx), - #[cfg(unix)] - AsyncStd::Unix(r) => Pin::new(r).poll_shutdown(cx), - } - } -} - -impl AsyncRead for AsyncStd { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut task::Context, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - match &mut *self { - AsyncStd::Tcp(r) => Pin::new(r).poll_read(cx, buf), - #[cfg(any( - feature = "async-std-native-tls-comp", - feature = "async-std-rustls-comp" - ))] - AsyncStd::TcpTls(r) => Pin::new(r).poll_read(cx, buf), - #[cfg(unix)] - AsyncStd::Unix(r) => Pin::new(r).poll_read(cx, buf), - } - } -} - -#[async_trait] -impl RedisRuntime for AsyncStd { - async fn connect_tcp(socket_addr: SocketAddr) -> RedisResult { - Ok(connect_tcp(&socket_addr) - .await - .map(|con| Self::Tcp(AsyncStdWrapped::new(con)))?) - } - - #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))] - async fn connect_tcp_tls( - hostname: &str, - socket_addr: SocketAddr, - insecure: bool, - _tls_params: &Option, - ) -> RedisResult { - let tcp_stream = connect_tcp(&socket_addr).await?; - let tls_connector = if insecure { - TlsConnector::new() - .danger_accept_invalid_certs(true) - .danger_accept_invalid_hostnames(true) - .use_sni(false) - } else { - TlsConnector::new() - }; - Ok(tls_connector - .connect(hostname, tcp_stream) - .await - .map(|con| Self::TcpTls(AsyncStdWrapped::new(Box::new(con))))?) - } - - #[cfg(feature = "tls-rustls")] - async fn connect_tcp_tls( - hostname: &str, - socket_addr: SocketAddr, - insecure: bool, - tls_params: &Option, - ) -> RedisResult { - let tcp_stream = connect_tcp(&socket_addr).await?; - - let config = create_rustls_config(insecure, tls_params.clone())?; - let tls_connector = TlsConnector::from(Arc::new(config)); - - Ok(tls_connector - .connect( - rustls_pki_types::ServerName::try_from(hostname)?.to_owned(), - tcp_stream, - ) - .await - .map(|con| Self::TcpTls(AsyncStdWrapped::new(Box::new(con))))?) - } - - #[cfg(unix)] - async fn connect_unix(path: &Path) -> RedisResult { - Ok(UnixStream::connect(path) - .await - .map(|con| Self::Unix(AsyncStdWrapped::new(con)))?) - } - - fn spawn(f: impl Future + Send + 'static) { - async_std::task::spawn(f); - } - - fn boxed(self) -> Pin> { - match self { - AsyncStd::Tcp(x) => Box::pin(x), - #[cfg(any( - feature = "async-std-native-tls-comp", - feature = "async-std-rustls-comp" - ))] - AsyncStd::TcpTls(x) => Box::pin(x), - #[cfg(unix)] - AsyncStd::Unix(x) => Box::pin(x), - } - } -} diff --git a/glide-core/redis-rs/redis/src/aio/connection.rs b/glide-core/redis-rs/redis/src/aio/connection.rs index 6b1f6e657a..5adef7869f 100644 --- a/glide-core/redis-rs/redis/src/aio/connection.rs +++ b/glide-core/redis-rs/redis/src/aio/connection.rs @@ -1,7 +1,5 @@ #![allow(deprecated)] -#[cfg(feature = "async-std-comp")] -use super::async_std; use super::ConnectionLike; use super::{setup_connection, AsyncStream, RedisRuntime}; use crate::cmd::{cmd, Cmd}; @@ -9,12 +7,10 @@ use crate::connection::{ resp2_is_pub_sub_state_cleared, resp3_is_pub_sub_state_cleared, ConnectionAddr, ConnectionInfo, Msg, RedisConnectionInfo, }; -#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] +#[cfg(any(feature = "tokio-comp"))] use crate::parser::ValueCodec; use crate::types::{ErrorKind, FromRedisValue, RedisError, RedisFuture, RedisResult, Value}; use crate::{from_owned_redis_value, ProtocolVersion, ToRedisArgs}; -#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] -use ::async_std::net::ToSocketAddrs; use ::tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; #[cfg(feature = "tokio-comp")] use ::tokio::net::lookup_host; @@ -26,7 +22,7 @@ use futures_util::{ }; use std::net::{IpAddr, SocketAddr}; use std::pin::Pin; -#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] +#[cfg(feature = "tokio-comp")] use tokio_util::codec::Decoder; use tracing::info; @@ -194,19 +190,6 @@ where } } -#[cfg(feature = "async-std-comp")] -#[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))] -impl Connection> -where - C: Unpin + ::async_std::io::Read + ::async_std::io::Write + Send, -{ - /// Constructs a new `Connection` out of a `async_std::io::AsyncRead + async_std::io::AsyncWrite` object - /// and a `RedisConnectionInfo` - pub async fn new_async_std(connection_info: &RedisConnectionInfo, con: C) -> RedisResult { - Connection::new(connection_info, async_std::AsyncStdWrapped::new(con)).await - } -} - pub(crate) async fn connect( connection_info: &ConnectionInfo, socket_addr: Option, @@ -436,8 +419,6 @@ pub(crate) async fn get_socket_addrs( ) -> RedisResult + Send + '_> { #[cfg(feature = "tokio-comp")] let socket_addrs = lookup_host((host, port)).await?; - #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] - let socket_addrs = (host, port).to_socket_addrs().await?; let mut socket_addrs = socket_addrs.peekable(); match socket_addrs.peek() { diff --git a/glide-core/redis-rs/redis/src/aio/connection_manager.rs b/glide-core/redis-rs/redis/src/aio/connection_manager.rs index 61df9bc31a..dce7b254a5 100644 --- a/glide-core/redis-rs/redis/src/aio/connection_manager.rs +++ b/glide-core/redis-rs/redis/src/aio/connection_manager.rs @@ -7,8 +7,7 @@ use crate::{ aio::{ConnectionLike, MultiplexedConnection, Runtime}, Client, }; -#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] -use ::async_std::net::ToSocketAddrs; + use arc_swap::ArcSwap; use futures::{ future::{self, Shared}, diff --git a/glide-core/redis-rs/redis/src/aio/mod.rs b/glide-core/redis-rs/redis/src/aio/mod.rs index ffe2c9e3a2..34c098d600 100644 --- a/glide-core/redis-rs/redis/src/aio/mod.rs +++ b/glide-core/redis-rs/redis/src/aio/mod.rs @@ -14,11 +14,6 @@ use std::path::Path; use std::pin::Pin; use std::time::Duration; -/// Enables the async_std compatibility -#[cfg(feature = "async-std-comp")] -#[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))] -pub mod async_std; - #[cfg(feature = "tls-rustls")] use crate::tls::TlsConnParams; diff --git a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs index 1067bc2df5..fb1b62f8a1 100644 --- a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs +++ b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs @@ -3,7 +3,7 @@ use crate::aio::setup_connection; use crate::aio::DisconnectNotifier; use crate::client::GlideConnectionOptions; use crate::cmd::Cmd; -#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] +#[cfg(feature = "tokio-comp")] use crate::parser::ValueCodec; use crate::push_manager::PushManager; use crate::types::{RedisError, RedisFuture, RedisResult, Value}; @@ -29,7 +29,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::{self, Poll}; use std::time::Duration; -#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] +#[cfg(feature = "tokio-comp")] use tokio_util::codec::Decoder; // Senders which the result of a single request are sent through @@ -448,8 +448,8 @@ impl MultiplexedConnection { Box::pin(f) } - #[cfg(all(not(feature = "tokio-comp"), not(feature = "async-std-comp")))] - compile_error!("tokio-comp or async-std-comp features required for aio feature"); + #[cfg(not(feature = "tokio-comp"))] + compile_error!("tokio-comp feature is required for aio feature"); let redis_connection_info = &connection_info.redis; let codec = ValueCodec::default() diff --git a/glide-core/redis-rs/redis/src/aio/runtime.rs b/glide-core/redis-rs/redis/src/aio/runtime.rs index 5755f62c9f..2222783ed8 100644 --- a/glide-core/redis-rs/redis/src/aio/runtime.rs +++ b/glide-core/redis-rs/redis/src/aio/runtime.rs @@ -2,8 +2,6 @@ use std::{io, time::Duration}; use futures_util::Future; -#[cfg(feature = "async-std-comp")] -use super::async_std; #[cfg(feature = "tokio-comp")] use super::tokio; use super::RedisRuntime; @@ -13,34 +11,17 @@ use crate::types::RedisError; pub(crate) enum Runtime { #[cfg(feature = "tokio-comp")] Tokio, - #[cfg(feature = "async-std-comp")] - AsyncStd, } impl Runtime { pub(crate) fn locate() -> Self { - #[cfg(all(feature = "tokio-comp", not(feature = "async-std-comp")))] + #[cfg(not(feature = "tokio-comp"))] { - Runtime::Tokio - } - - #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] - { - Runtime::AsyncStd + compile_error!("tokio-comp feature is required for aio feature") } - - #[cfg(all(feature = "tokio-comp", feature = "async-std-comp"))] + #[cfg(feature = "tokio-comp")] { - if ::tokio::runtime::Handle::try_current().is_ok() { - Runtime::Tokio - } else { - Runtime::AsyncStd - } - } - - #[cfg(all(not(feature = "tokio-comp"), not(feature = "async-std-comp")))] - { - compile_error!("tokio-comp or async-std-comp features required for aio feature") + Runtime::Tokio } } @@ -49,8 +30,6 @@ impl Runtime { match self { #[cfg(feature = "tokio-comp")] Runtime::Tokio => tokio::Tokio::spawn(f), - #[cfg(feature = "async-std-comp")] - Runtime::AsyncStd => async_std::AsyncStd::spawn(f), } } @@ -64,10 +43,6 @@ impl Runtime { Runtime::Tokio => ::tokio::time::timeout(duration, future) .await .map_err(|_| Elapsed(())), - #[cfg(feature = "async-std-comp")] - Runtime::AsyncStd => ::async_std::future::timeout(duration, future) - .await - .map_err(|_| Elapsed(())), } } } diff --git a/glide-core/redis-rs/redis/src/client.rs b/glide-core/redis-rs/redis/src/client.rs index 5e3f144e71..fd8c4c08b4 100644 --- a/glide-core/redis-rs/redis/src/client.rs +++ b/glide-core/redis-rs/redis/src/client.rs @@ -88,13 +88,12 @@ pub struct GlideConnectionOptions { pub disconnect_notifier: Option>, } -/// To enable async support you need to chose one of the supported runtimes and active its -/// corresponding feature: `tokio-comp` or `async-std-comp` +/// To enable async support you need to enable the feature: `tokio-comp` #[cfg(feature = "aio")] #[cfg_attr(docsrs, doc(cfg(feature = "aio")))] impl Client { /// Returns an async connection from the client. - #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] + #[cfg(feature = "tokio-comp")] #[deprecated( note = "aio::Connection is deprecated. Use client::get_multiplexed_async_connection instead." )] @@ -109,11 +108,6 @@ impl Client { self.get_simple_async_connection::(None) .await? } - #[cfg(feature = "async-std-comp")] - Runtime::AsyncStd => { - self.get_simple_async_connection::(None) - .await? - } }; crate::aio::Connection::new(&self.connection_info.redis, con).await @@ -136,27 +130,8 @@ impl Client { } /// Returns an async connection from the client. - #[cfg(feature = "async-std-comp")] - #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))] - #[deprecated( - note = "aio::Connection is deprecated. Use client::get_multiplexed_async_std_connection instead." - )] - #[allow(deprecated)] - pub async fn get_async_std_connection(&self) -> RedisResult { - use crate::aio::RedisRuntime; - Ok( - crate::aio::connect::(&self.connection_info, None) - .await? - .map(RedisRuntime::boxed), - ) - } - - /// Returns an async connection from the client. - #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] - #[cfg_attr( - docsrs, - doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp"))) - )] + #[cfg(feature = "tokio-comp")] + #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))] pub async fn get_multiplexed_async_connection( &self, glide_connection_options: GlideConnectionOptions, @@ -170,11 +145,8 @@ impl Client { } /// Returns an async connection from the client. - #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] - #[cfg_attr( - docsrs, - doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp"))) - )] + #[cfg(feature = "tokio-comp")] + #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))] pub async fn get_multiplexed_async_connection_with_timeouts( &self, response_timeout: std::time::Duration, @@ -194,18 +166,6 @@ impl Client { ) .await } - #[cfg(feature = "async-std-comp")] - rt @ Runtime::AsyncStd => { - rt.timeout( - connection_timeout, - self.get_multiplexed_async_connection_inner::( - response_timeout, - None, - glide_connection_options, - ), - ) - .await - } }; match result { @@ -218,11 +178,8 @@ impl Client { /// For TCP connections: returns (async connection, Some(the direct IP address)) /// For Unix connections, returns (async connection, None) - #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] - #[cfg_attr( - docsrs, - doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp"))) - )] + #[cfg(feature = "tokio-comp")] + #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))] pub async fn get_multiplexed_async_connection_and_ip( &self, glide_connection_options: GlideConnectionOptions, @@ -237,15 +194,6 @@ impl Client { ) .await } - #[cfg(feature = "async-std-comp")] - Runtime::AsyncStd => { - self.get_multiplexed_async_connection_inner::( - Duration::MAX, - None, - glide_connection_options, - ) - .await - } } } @@ -297,54 +245,6 @@ impl Client { .await } - /// Returns an async multiplexed connection from the client. - /// - /// A multiplexed connection can be cloned, allowing requests to be be sent concurrently - /// on the same underlying connection (tcp/unix socket). - #[cfg(feature = "async-std-comp")] - #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))] - pub async fn get_multiplexed_async_std_connection_with_timeouts( - &self, - response_timeout: std::time::Duration, - connection_timeout: std::time::Duration, - glide_connection_options: GlideConnectionOptions, - ) -> RedisResult { - let result = Runtime::locate() - .timeout( - connection_timeout, - self.get_multiplexed_async_connection_inner::( - response_timeout, - None, - glide_connection_options, - ), - ) - .await; - - match result { - Ok(Ok((connection, _ip))) => Ok(connection), - Ok(Err(e)) => Err(e), - Err(elapsed) => Err(elapsed.into()), - } - } - - /// Returns an async multiplexed connection from the client. - /// - /// A multiplexed connection can be cloned, allowing requests to be be sent concurrently - /// on the same underlying connection (tcp/unix socket). - #[cfg(feature = "async-std-comp")] - #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))] - pub async fn get_multiplexed_async_std_connection( - &self, - glide_connection_options: GlideConnectionOptions, - ) -> RedisResult { - self.get_multiplexed_async_std_connection_with_timeouts( - std::time::Duration::MAX, - std::time::Duration::MAX, - glide_connection_options, - ) - .await - } - /// Returns an async multiplexed connection from the client and a future which must be polled /// to drive any requests submitted to it (see `get_multiplexed_tokio_connection`). /// @@ -392,52 +292,6 @@ impl Client { .map(|conn_res| (conn_res.0, conn_res.1)) } - /// Returns an async multiplexed connection from the client and a future which must be polled - /// to drive any requests submitted to it (see `get_multiplexed_tokio_connection`). - /// - /// A multiplexed connection can be cloned, allowing requests to be be sent concurrently - /// on the same underlying connection (tcp/unix socket). - /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`. - #[cfg(feature = "async-std-comp")] - #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))] - pub async fn create_multiplexed_async_std_connection_with_response_timeout( - &self, - response_timeout: std::time::Duration, - glide_connection_options: GlideConnectionOptions, - ) -> RedisResult<( - crate::aio::MultiplexedConnection, - impl std::future::Future, - )> { - self.create_multiplexed_async_connection_inner::( - response_timeout, - None, - glide_connection_options, - ) - .await - .map(|(conn, driver, _ip)| (conn, driver)) - } - - /// Returns an async multiplexed connection from the client and a future which must be polled - /// to drive any requests submitted to it (see `get_multiplexed_tokio_connection`). - /// - /// A multiplexed connection can be cloned, allowing requests to be be sent concurrently - /// on the same underlying connection (tcp/unix socket). - #[cfg(feature = "async-std-comp")] - #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))] - pub async fn create_multiplexed_async_std_connection( - &self, - glide_connection_options: GlideConnectionOptions, - ) -> RedisResult<( - crate::aio::MultiplexedConnection, - impl std::future::Future, - )> { - self.create_multiplexed_async_std_connection_with_response_timeout( - std::time::Duration::MAX, - glide_connection_options, - ) - .await - } - /// Returns an async [`ConnectionManager`][connection-manager] from the client. /// /// The connection manager wraps a @@ -785,7 +639,7 @@ impl Client { } /// Returns an async receiver for pub-sub messages. - #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] + #[cfg(feature = "tokio-comp")] // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later? pub async fn get_async_pubsub(&self) -> RedisResult { #[allow(deprecated)] @@ -795,7 +649,7 @@ impl Client { } /// Returns an async receiver for monitor messages. - #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] + #[cfg(feature = "tokio-comp")] // TODO - do we want to type-erase monitor using a trait, to allow us to replace it with a different implementation later? pub async fn get_async_monitor(&self) -> RedisResult { #[allow(deprecated)] diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index be7beb79b7..c8628c16bb 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -38,11 +38,7 @@ use crate::{ commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC}, FromRedisValue, InfoDict, ToRedisArgs, }; -#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] -use async_std::task::{spawn, JoinHandle}; use dashmap::DashMap; -#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] -use futures::executor::block_on; use std::{ collections::{HashMap, HashSet}, fmt, io, mem, @@ -84,13 +80,6 @@ use crate::{ use futures::stream::{FuturesUnordered, StreamExt}; use std::time::Duration; -#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] -use crate::aio::{async_std::AsyncStd, RedisRuntime}; -#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] -use backoff_std_async::future::retry; -#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] -use backoff_std_async::{Error as BackoffError, ExponentialBackoff}; - #[cfg(feature = "tokio-comp")] use async_trait::async_trait; #[cfg(feature = "tokio-comp")] @@ -142,9 +131,6 @@ where }; #[cfg(feature = "tokio-comp")] tokio::spawn(stream); - #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] - AsyncStd::spawn(stream); - ClusterConnection(tx) }) } @@ -510,14 +496,10 @@ pub(crate) struct ClusterConnInner { impl Dispose for ClusterConnInner { fn dispose(self) { if let Some(handle) = self.periodic_checks_handler { - #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] - block_on(handle.cancel()); #[cfg(feature = "tokio-comp")] handle.abort() } if let Some(handle) = self.connections_validation_handler { - #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] - block_on(handle.cancel()); #[cfg(feature = "tokio-comp")] handle.abort() } @@ -657,9 +639,6 @@ fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> { #[cfg(feature = "tokio-comp")] return Box::pin(tokio::time::sleep(duration)); - - #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] - return Box::pin(async_std::task::sleep(duration)); } pub(crate) enum Response { @@ -1080,10 +1059,6 @@ where { connection.periodic_checks_handler = Some(tokio::spawn(periodic_task)); } - #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] - { - connection.periodic_checks_handler = Some(spawn(periodic_task)); - } } let connections_validation_interval = cluster_params.connections_validation_interval; @@ -1095,11 +1070,6 @@ where connection.connections_validation_handler = Some(tokio::spawn(connections_validation_handler)); } - #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] - { - connection.connections_validation_handler = - Some(spawn(connections_validation_handler)); - } } Ok(Disposable::new(connection)) @@ -2555,16 +2525,6 @@ impl Connect for MultiplexedConnection { ) .await? } - #[cfg(feature = "async-std-comp")] - rt @ Runtime::AsyncStd => { - rt.timeout(connection_timeout,client - .get_multiplexed_async_connection_inner::( - response_timeout, - socket_addr, - glide_connection_options, - )) - .await? - } } } .boxed() diff --git a/glide-core/redis-rs/redis/src/sentinel.rs b/glide-core/redis-rs/redis/src/sentinel.rs index ac6aac65cc..569ab2fe0f 100644 --- a/glide-core/redis-rs/redis/src/sentinel.rs +++ b/glide-core/redis-rs/redis/src/sentinel.rs @@ -746,8 +746,7 @@ impl SentinelClient { } } -/// To enable async support you need to chose one of the supported runtimes and active its -/// corresponding feature: `tokio-comp` or `async-std-comp` +/// To enable async support you need to enable the feature: `tokio-comp` #[cfg(feature = "aio")] #[cfg_attr(docsrs, doc(cfg(feature = "aio")))] impl SentinelClient { @@ -768,7 +767,7 @@ impl SentinelClient { /// Returns an async connection from the client, using the same logic from /// `SentinelClient::get_connection`. - #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] + #[cfg(feature = "tokio-comp")] pub async fn get_async_connection(&mut self) -> RedisResult { let client = self.async_get_client().await?; client diff --git a/glide-core/redis-rs/redis/tests/support/mod.rs b/glide-core/redis-rs/redis/tests/support/mod.rs index 335cd045de..72dc7c9a78 100644 --- a/glide-core/redis-rs/redis/tests/support/mod.rs +++ b/glide-core/redis-rs/redis/tests/support/mod.rs @@ -85,14 +85,6 @@ where res } -#[cfg(feature = "async-std-comp")] -pub fn block_on_all_using_async_std(f: F) -> F::Output -where - F: Future, -{ - async_std::task::block_on(f) -} - #[cfg(any(feature = "cluster", feature = "cluster-async"))] mod cluster; @@ -514,15 +506,6 @@ impl TestContext { self.client.get_async_pubsub().await } - #[cfg(feature = "async-std-comp")] - pub async fn async_connection_async_std( - &self, - ) -> redis::RedisResult { - self.client - .get_multiplexed_async_std_connection(GlideConnectionOptions::default()) - .await - } - pub fn stop_server(&mut self) { self.server.stop(); } @@ -543,15 +526,6 @@ impl TestContext { .await } - #[cfg(feature = "async-std-comp")] - pub async fn multiplexed_async_connection_async_std( - &self, - ) -> redis::RedisResult { - self.client - .get_multiplexed_async_std_connection(GlideConnectionOptions::default()) - .await - } - pub fn get_version(&self) -> Version { let mut conn = self.connection(); get_version(&mut conn) diff --git a/glide-core/redis-rs/redis/tests/test_async_async_std.rs b/glide-core/redis-rs/redis/tests/test_async_async_std.rs deleted file mode 100644 index 656d1979f6..0000000000 --- a/glide-core/redis-rs/redis/tests/test_async_async_std.rs +++ /dev/null @@ -1,328 +0,0 @@ -#![allow(unknown_lints, dependency_on_unit_never_type_fallback)] -use futures::prelude::*; - -use crate::support::*; - -use redis::{aio::MultiplexedConnection, GlideConnectionOptions, RedisResult}; - -mod support; - -#[test] -fn test_args() { - let ctx = TestContext::new(); - let connect = ctx.async_connection_async_std(); - - block_on_all_using_async_std(connect.and_then(|mut con| async move { - redis::cmd("SET") - .arg("key1") - .arg(b"foo") - .query_async(&mut con) - .await?; - redis::cmd("SET") - .arg(&["key2", "bar"]) - .query_async(&mut con) - .await?; - let result = redis::cmd("MGET") - .arg(&["key1", "key2"]) - .query_async(&mut con) - .await; - assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec()))); - result - })) - .unwrap(); -} - -#[test] -fn test_args_async_std() { - let ctx = TestContext::new(); - let connect = ctx.async_connection_async_std(); - - block_on_all_using_async_std(connect.and_then(|mut con| async move { - redis::cmd("SET") - .arg("key1") - .arg(b"foo") - .query_async(&mut con) - .await?; - redis::cmd("SET") - .arg(&["key2", "bar"]) - .query_async(&mut con) - .await?; - let result = redis::cmd("MGET") - .arg(&["key1", "key2"]) - .query_async(&mut con) - .await; - assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec()))); - result - })) - .unwrap(); -} - -#[test] -fn dont_panic_on_closed_multiplexed_connection() { - let ctx = TestContext::new(); - let client = ctx.client.clone(); - let connect = client.get_multiplexed_async_std_connection(GlideConnectionOptions::default()); - drop(ctx); - - block_on_all_using_async_std(async move { - connect - .and_then(|con| async move { - let cmd = move || { - let mut con = con.clone(); - async move { - redis::cmd("SET") - .arg("key1") - .arg(b"foo") - .query_async(&mut con) - .await - } - }; - let result: RedisResult<()> = cmd().await; - assert_eq!( - result.as_ref().unwrap_err().kind(), - redis::ErrorKind::IoError, - "{}", - result.as_ref().unwrap_err() - ); - cmd().await - }) - .map(|result| { - assert_eq!( - result.as_ref().unwrap_err().kind(), - redis::ErrorKind::IoError, - "{}", - result.as_ref().unwrap_err() - ); - }) - .await - }); -} - -#[test] -fn test_pipeline_transaction() { - let ctx = TestContext::new(); - block_on_all_using_async_std(async move { - let mut con = ctx.async_connection_async_std().await?; - let mut pipe = redis::pipe(); - pipe.atomic() - .cmd("SET") - .arg("key_1") - .arg(42) - .ignore() - .cmd("SET") - .arg("key_2") - .arg(43) - .ignore() - .cmd("MGET") - .arg(&["key_1", "key_2"]); - pipe.query_async(&mut con) - .map_ok(|((k1, k2),): ((i32, i32),)| { - assert_eq!(k1, 42); - assert_eq!(k2, 43); - }) - .await - }) - .unwrap(); -} - -fn test_cmd(con: &MultiplexedConnection, i: i32) -> impl Future> + Send { - let mut con = con.clone(); - async move { - let key = format!("key{i}"); - let key_2 = key.clone(); - let key2 = format!("key{i}_2"); - let key2_2 = key2.clone(); - - let foo_val = format!("foo{i}"); - - redis::cmd("SET") - .arg(&key[..]) - .arg(foo_val.as_bytes()) - .query_async(&mut con) - .await?; - redis::cmd("SET") - .arg(&[&key2, "bar"]) - .query_async(&mut con) - .await?; - redis::cmd("MGET") - .arg(&[&key_2, &key2_2]) - .query_async(&mut con) - .map(|result| { - assert_eq!(Ok((foo_val, b"bar".to_vec())), result); - Ok(()) - }) - .await - } -} - -fn test_error(con: &MultiplexedConnection) -> impl Future> { - let mut con = con.clone(); - async move { - redis::cmd("SET") - .query_async(&mut con) - .map(|result| match result { - Ok(()) => panic!("Expected redis to return an error"), - Err(_) => Ok(()), - }) - .await - } -} - -#[test] -fn test_args_multiplexed_connection() { - let ctx = TestContext::new(); - block_on_all_using_async_std(async move { - ctx.multiplexed_async_connection_async_std() - .and_then(|con| { - let cmds = (0..100).map(move |i| test_cmd(&con, i)); - future::try_join_all(cmds).map_ok(|results| { - assert_eq!(results.len(), 100); - }) - }) - .map_err(|err| panic!("{}", err)) - .await - }) - .unwrap(); -} - -#[test] -fn test_args_with_errors_multiplexed_connection() { - let ctx = TestContext::new(); - block_on_all_using_async_std(async move { - ctx.multiplexed_async_connection_async_std() - .and_then(|con| { - let cmds = (0..100).map(move |i| { - let con = con.clone(); - async move { - if i % 2 == 0 { - test_cmd(&con, i).await - } else { - test_error(&con).await - } - } - }); - future::try_join_all(cmds).map_ok(|results| { - assert_eq!(results.len(), 100); - }) - }) - .map_err(|err| panic!("{}", err)) - .await - }) - .unwrap(); -} - -#[test] -fn test_transaction_multiplexed_connection() { - let ctx = TestContext::new(); - block_on_all_using_async_std(async move { - ctx.multiplexed_async_connection_async_std() - .and_then(|con| { - let cmds = (0..100).map(move |i| { - let mut con = con.clone(); - async move { - let foo_val = i; - let bar_val = format!("bar{i}"); - - let mut pipe = redis::pipe(); - pipe.atomic() - .cmd("SET") - .arg("key") - .arg(foo_val) - .ignore() - .cmd("SET") - .arg(&["key2", &bar_val[..]]) - .ignore() - .cmd("MGET") - .arg(&["key", "key2"]); - - pipe.query_async(&mut con) - .map(move |result| { - assert_eq!(Ok(((foo_val, bar_val.into_bytes()),)), result); - result - }) - .await - } - }); - future::try_join_all(cmds) - }) - .map_ok(|results| { - assert_eq!(results.len(), 100); - }) - .map_err(|err| panic!("{}", err)) - .await - }) - .unwrap(); -} - -#[test] -#[cfg(feature = "script")] -fn test_script() { - use redis::RedisError; - - // Note this test runs both scripts twice to test when they have already been loaded - // into Redis and when they need to be loaded in - let script1 = redis::Script::new("return redis.call('SET', KEYS[1], ARGV[1])"); - let script2 = redis::Script::new("return redis.call('GET', KEYS[1])"); - let script3 = redis::Script::new("return redis.call('KEYS', '*')"); - - let ctx = TestContext::new(); - - block_on_all_using_async_std(async move { - let mut con = ctx.multiplexed_async_connection_async_std().await?; - script1 - .key("key1") - .arg("foo") - .invoke_async(&mut con) - .await?; - let val: String = script2.key("key1").invoke_async(&mut con).await?; - assert_eq!(val, "foo"); - let keys: Vec = script3.invoke_async(&mut con).await?; - assert_eq!(keys, ["key1"]); - script1 - .key("key1") - .arg("bar") - .invoke_async(&mut con) - .await?; - let val: String = script2.key("key1").invoke_async(&mut con).await?; - assert_eq!(val, "bar"); - let keys: Vec = script3.invoke_async(&mut con).await?; - assert_eq!(keys, ["key1"]); - Ok::<_, RedisError>(()) - }) - .unwrap(); -} - -#[test] -#[cfg(feature = "script")] -fn test_script_load() { - let ctx = TestContext::new(); - let script = redis::Script::new("return 'Hello World'"); - - block_on_all(async move { - let mut con = ctx.multiplexed_async_connection_async_std().await.unwrap(); - - let hash = script.prepare_invoke().load_async(&mut con).await.unwrap(); - assert_eq!(hash, script.get_hash().to_string()); - Ok(()) - }) - .unwrap(); -} - -#[test] -#[cfg(feature = "script")] -fn test_script_returning_complex_type() { - let ctx = TestContext::new(); - block_on_all_using_async_std(async { - let mut con = ctx.multiplexed_async_connection_async_std().await?; - redis::Script::new("return {1, ARGV[1], true}") - .arg("hello") - .invoke_async(&mut con) - .map_ok(|(i, s, b): (i32, String, bool)| { - assert_eq!(i, 1); - assert_eq!(s, "hello"); - assert!(b); - }) - .await - }) - .unwrap(); -} diff --git a/glide-core/redis-rs/redis/tests/test_basic.rs b/glide-core/redis-rs/redis/tests/test_basic.rs index e31c33384c..4c9ad3aae8 100644 --- a/glide-core/redis-rs/redis/tests/test_basic.rs +++ b/glide-core/redis-rs/redis/tests/test_basic.rs @@ -19,6 +19,7 @@ mod basic { use crate::{assert_args, support::*}; #[test] + #[serial_test::serial] fn test_parse_redis_url() { let redis_url = "redis://127.0.0.1:1234/0".to_string(); redis::parse_redis_url(&redis_url).unwrap(); @@ -27,11 +28,13 @@ mod basic { } #[test] + #[serial_test::serial] fn test_redis_url_fromstr() { let _info: ConnectionInfo = "redis://127.0.0.1:1234/0".parse().unwrap(); } #[test] + #[serial_test::serial] fn test_args() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -46,6 +49,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_getset() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -62,6 +66,7 @@ mod basic { //unit test for key_type function #[test] + #[serial_test::serial] fn test_key_type() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -107,6 +112,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_client_tracking_doesnt_block_execution() { //It checks if the library distinguish a push-type message from the others and continues its normal operation. let ctx = TestContext::new(); @@ -144,6 +150,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_incr() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -153,6 +160,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_getdel() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -168,6 +176,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_getex() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -202,6 +211,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_info() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -218,6 +228,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_hash_ops() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -248,6 +259,7 @@ mod basic { // Not supported with the current appveyor/windows binary deployed. #[cfg(not(target_os = "windows"))] #[test] + #[serial_test::serial] fn test_unlink() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -262,6 +274,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_set_ops() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -287,6 +300,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_scan() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -305,6 +319,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_optionals() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -327,6 +342,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_scanning() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -354,6 +370,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_filtered_scanning() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -380,6 +397,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pipeline() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -403,6 +421,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pipeline_with_err() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -440,6 +459,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_empty_pipeline() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -450,6 +470,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pipeline_transaction() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -474,6 +495,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pipeline_transaction_with_errors() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -502,6 +524,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pipeline_reuse_query() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -540,6 +563,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pipeline_reuse_query_clear() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -578,6 +602,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_real_transaction() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -612,6 +637,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_real_transaction_highlevel() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -635,6 +661,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pubsub() { use std::sync::{Arc, Barrier}; let ctx = TestContext::new(); @@ -672,6 +699,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pubsub_unsubscribe() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -693,6 +721,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pubsub_subscribe_while_messages_are_sent() { let ctx = TestContext::new(); let mut conn_external = ctx.connection(); @@ -751,6 +780,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pubsub_unsubscribe_no_subs() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -766,6 +796,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pubsub_unsubscribe_one_sub() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -782,6 +813,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_pubsub_unsubscribe_one_sub_one_psub() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -799,6 +831,7 @@ mod basic { } #[test] + #[serial_test::serial] fn scoped_pubsub() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -847,6 +880,7 @@ mod basic { } #[test] + #[serial_test::serial] #[cfg(feature = "script")] fn test_script() { let ctx = TestContext::new(); @@ -869,6 +903,7 @@ mod basic { } #[test] + #[serial_test::serial] #[cfg(feature = "script")] fn test_script_load() { let ctx = TestContext::new(); @@ -882,6 +917,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_tuple_args() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -908,6 +944,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_nice_api() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -931,6 +968,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_auto_m_versions() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -942,6 +980,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_nice_hash_api() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -995,6 +1034,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_nice_list_api() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1023,6 +1063,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_tuple_decoding_regression() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1041,6 +1082,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_bit_operations() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1050,6 +1092,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_redis_server_down() { let mut ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1067,6 +1110,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_zinterstore_weights() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1122,6 +1166,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_zunionstore_weights() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1189,6 +1234,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_zrembylex() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1221,6 +1267,7 @@ mod basic { // Not supported with the current appveyor/windows binary deployed. #[cfg(not(target_os = "windows"))] #[test] + #[serial_test::serial] fn test_zrandmember() { use redis::ProtocolVersion; @@ -1271,6 +1318,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_sismember() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1289,6 +1337,7 @@ mod basic { // Not supported with the current appveyor/windows binary deployed. #[cfg(not(target_os = "windows"))] #[test] + #[serial_test::serial] fn test_smismember() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1300,6 +1349,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_object_commands() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1335,6 +1385,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_mget() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1355,6 +1406,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_variable_length_get() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1367,6 +1419,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_multi_generics() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1378,6 +1431,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_set_options_with_get() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1392,6 +1446,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_set_options_options() { let empty = SetOptions::default(); assert_eq!(ToRedisArgs::to_redis_args(&empty).len(), 0); @@ -1428,6 +1483,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_blocking_sorted_set_api() { let ctx = TestContext::new(); let mut con = ctx.connection(); @@ -1484,6 +1540,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_set_client_name_by_config() { const CLIENT_NAME: &str = "TEST_CLIENT_NAME"; @@ -1507,6 +1564,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_push_manager() { let ctx = TestContext::new(); if ctx.protocol == ProtocolVersion::RESP2 { @@ -1562,6 +1620,7 @@ mod basic { } #[test] + #[serial_test::serial] fn test_push_manager_disconnection() { let ctx = TestContext::new(); if ctx.protocol == ProtocolVersion::RESP2 { diff --git a/glide-core/redis-rs/redis/tests/test_cluster.rs b/glide-core/redis-rs/redis/tests/test_cluster.rs index cbeddd2fe4..38b3019edb 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster.rs @@ -16,6 +16,7 @@ mod cluster { }; #[test] + #[serial_test::serial] fn test_cluster_basics() { let cluster = TestClusterContext::new(3, 0); let mut con = cluster.connection(); @@ -35,6 +36,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_with_username_and_password() { let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -65,6 +67,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_with_bad_password() { let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -80,6 +83,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_read_from_replicas() { let cluster = TestClusterContext::new_with_cluster_client_builder( 6, @@ -106,6 +110,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_eval() { let cluster = TestClusterContext::new(3, 0); let mut con = cluster.connection(); @@ -127,6 +132,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_resp3() { if use_protocol() == ProtocolVersion::RESP2 { return; @@ -159,6 +165,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_multi_shard_commands() { let cluster = TestClusterContext::new(3, 0); @@ -173,6 +180,7 @@ mod cluster { } #[test] + #[serial_test::serial] #[cfg(feature = "script")] fn test_cluster_script() { let cluster = TestClusterContext::new(3, 0); @@ -191,6 +199,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_pipeline() { let cluster = TestClusterContext::new(3, 0); cluster.wait_for_cluster_up(); @@ -207,6 +216,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_pipeline_multiple_keys() { use redis::FromRedisValue; let cluster = TestClusterContext::new(3, 0); @@ -244,6 +254,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_pipeline_invalid_command() { let cluster = TestClusterContext::new(3, 0); cluster.wait_for_cluster_up(); @@ -272,6 +283,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_can_connect_to_server_that_sends_cluster_slots_without_host_name() { let name = "test_cluster_can_connect_to_server_that_sends_cluster_slots_without_host_name"; @@ -298,6 +310,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_can_connect_to_server_that_sends_cluster_slots_with_null_host_name() { let name = "test_cluster_can_connect_to_server_that_sends_cluster_slots_with_null_host_name"; @@ -322,6 +335,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_can_connect_to_server_that_sends_cluster_slots_with_partial_nodes_with_unknown_host_name( ) { let name = "test_cluster_can_connect_to_server_that_sends_cluster_slots_with_partial_nodes_with_unknown_host_name"; @@ -358,6 +372,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_pipeline_command_ordering() { let cluster = TestClusterContext::new(3, 0); cluster.wait_for_cluster_up(); @@ -383,6 +398,7 @@ mod cluster { } #[test] + #[serial_test::serial] #[ignore] // Flaky fn test_cluster_pipeline_ordering_with_improper_command() { let cluster = TestClusterContext::new(3, 0); @@ -417,6 +433,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_retries() { let name = "tryagain"; @@ -444,6 +461,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_exhaust_retries() { let name = "tryagain_exhaust_retries"; @@ -479,6 +497,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_move_error_when_new_node_is_added() { let name = "rebuild_with_extra_nodes"; @@ -536,6 +555,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_ask_redirect() { let name = "node"; let completed = Arc::new(AtomicI32::new(0)); @@ -580,6 +600,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_ask_error_when_new_node_is_added() { let name = "ask_with_extra_nodes"; @@ -629,6 +650,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_replica_read() { let name = "node"; @@ -682,6 +704,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_io_error() { let name = "node"; let completed = Arc::new(AtomicI32::new(0)); @@ -715,6 +738,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_non_retryable_error_should_not_retry() { let name = "node"; let completed = Arc::new(AtomicI32::new(0)); @@ -785,16 +809,19 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_fan_out_to_all_primaries() { test_cluster_fan_out("FLUSHALL", vec![6379, 6381], None); } #[test] + #[serial_test::serial] fn test_cluster_fan_out_to_all_nodes() { test_cluster_fan_out("CONFIG SET", vec![6379, 6380, 6381, 6382], None); } #[test] + #[serial_test::serial] fn test_cluster_fan_out_out_once_to_each_primary_when_no_replicas_are_available() { test_cluster_fan_out( "CONFIG SET", @@ -815,6 +842,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_fan_out_out_once_even_if_primary_has_multiple_slot_ranges() { test_cluster_fan_out( "CONFIG SET", @@ -845,6 +873,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_split_multi_shard_command_and_combine_arrays_of_values() { let name = "test_cluster_split_multi_shard_command_and_combine_arrays_of_values"; let mut cmd = cmd("MGET"); @@ -882,6 +911,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_route_correctly_on_packed_transaction_with_single_node_requests() { let name = "test_cluster_route_correctly_on_packed_transaction_with_single_node_requests"; let mut pipeline = redis::pipe(); @@ -931,6 +961,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_route_correctly_on_packed_transaction_with_single_node_requests2() { let name = "test_cluster_route_correctly_on_packed_transaction_with_single_node_requests2"; let mut pipeline = redis::pipe(); @@ -974,6 +1005,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_with_client_name() { let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -1001,6 +1033,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_can_be_created_with_partial_slot_coverage() { let name = "test_cluster_can_be_created_with_partial_slot_coverage"; let slots_config = Some(vec![ @@ -1046,6 +1079,7 @@ mod cluster { use redis::ConnectionInfo; #[test] + #[serial_test::serial] fn test_cluster_basics_with_mtls() { let cluster = TestClusterContext::new_with_mtls(3, 0); @@ -1067,6 +1101,7 @@ mod cluster { } #[test] + #[serial_test::serial] fn test_cluster_should_not_connect_without_mtls() { let cluster = TestClusterContext::new_with_mtls(3, 0); diff --git a/glide-core/redis-rs/redis/tests/test_cluster_async.rs b/glide-core/redis-rs/redis/tests/test_cluster_async.rs index b690ed87b5..e6a5984fa7 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster_async.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster_async.rs @@ -99,6 +99,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_basic_cmd() { let cluster = TestClusterContext::new(3, 0); @@ -121,6 +122,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_basic_eval() { let cluster = TestClusterContext::new(3, 0); @@ -140,6 +142,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_basic_script() { let cluster = TestClusterContext::new(3, 0); @@ -159,6 +162,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_route_flush_to_specific_node() { let cluster = TestClusterContext::new(3, 0); @@ -194,6 +198,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_route_flush_to_node_by_address() { let cluster = TestClusterContext::new(3, 0); @@ -234,6 +239,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_route_info_to_nodes() { let cluster = TestClusterContext::new(12, 1); @@ -313,6 +319,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_resp3() { if use_protocol() == ProtocolVersion::RESP2 { return; @@ -352,6 +359,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_basic_pipe() { let cluster = TestClusterContext::new(3, 0); @@ -371,6 +379,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_multi_shard_commands() { let cluster = TestClusterContext::new(3, 0); @@ -389,6 +398,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_basic_failover() { block_on_all(async move { test_failover(&TestClusterContext::new(6, 1), 10, 123, false).await; @@ -582,6 +592,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_error_in_inner_connection() { let cluster = TestClusterContext::new(3, 0); @@ -606,30 +617,7 @@ mod cluster_async { } #[test] - #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] - fn test_async_cluster_async_std_basic_cmd() { - let cluster = TestClusterContext::new(3, 0); - - block_on_all_using_async_std(async { - let mut connection = cluster.async_connection(None).await; - redis::cmd("SET") - .arg("test") - .arg("test_data") - .query_async(&mut connection) - .await?; - redis::cmd("GET") - .arg("test") - .clone() - .query_async(&mut connection) - .map_ok(|res: String| { - assert_eq!(res, "test_data"); - }) - .await - }) - .unwrap(); - } - - #[test] + #[serial_test::serial] fn test_async_cluster_can_connect_to_server_that_sends_cluster_slots_without_host_name() { let name = "test_async_cluster_can_connect_to_server_that_sends_cluster_slots_without_host_name"; @@ -665,6 +653,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_can_connect_to_server_that_sends_cluster_slots_with_null_host_name() { let name = "test_async_cluster_can_connect_to_server_that_sends_cluster_slots_with_null_host_name"; @@ -697,6 +686,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_cannot_connect_to_server_with_unknown_host_name() { let name = "test_async_cluster_cannot_connect_to_server_with_unknown_host_name"; let handler = move |cmd: &[u8], _| { @@ -727,6 +717,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_can_connect_to_server_that_sends_cluster_slots_with_partial_nodes_with_unknown_host_name( ) { let name = "test_async_cluster_can_connect_to_server_that_sends_cluster_slots_with_partial_nodes_with_unknown_host_name"; @@ -772,6 +763,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_retries() { let name = "tryagain"; @@ -804,6 +796,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_tryagain_exhaust_retries() { let name = "tryagain_exhaust_retries"; @@ -862,6 +855,7 @@ mod cluster_async { } } #[test] + #[serial_test::serial] fn test_async_cluster_move_error_when_new_node_is_added() { let name = "rebuild_with_extra_nodes"; @@ -1008,7 +1002,7 @@ mod cluster_async { .query_async::<_, Option>(&mut connection) .await; assert_eq!(res, Ok(Some(123))); - // If there is a majority in the topology views, or if it's a 2-nodes cluster, we shall be able to calculate the topology on the first try, + // If there is a majority in the topology views, or if it's a 2-nodes cluster, we shall be able to calculate the topology on the first try, // so each node will be queried only once with CLUSTER SLOTS. // Otherwise, if we don't have a majority, we expect to see the refresh_slots function being called with the maximum retry number. let expected_calls = if has_a_majority || num_of_nodes == 2 {num_of_nodes} else {DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES * num_of_nodes}; @@ -1022,8 +1016,6 @@ mod cluster_async { #[cfg(feature = "tokio-comp")] tokio::time::sleep(sleep_duration).await; - #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] - async_std::task::sleep(sleep_duration).await; } } panic!("Failed to reach to the expected topology refresh retries. Found={refreshed_calls}, Expected={expected_calls}") @@ -1244,6 +1236,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_refresh_topology_after_moved_error_all_nodes_agree_get_succeed() { let ports = get_ports(3); test_async_cluster_refresh_topology_after_moved_assert_get_succeed_and_expected_retries( @@ -1254,6 +1247,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_refresh_topology_in_client_init_all_nodes_agree_get_succeed() { let ports = get_ports(3); test_async_cluster_refresh_topology_in_client_init_get_succeed( @@ -1263,6 +1257,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_refresh_topology_after_moved_error_with_no_majority_get_succeed() { for num_of_nodes in 2..4 { let ports = get_ports(num_of_nodes); @@ -1275,6 +1270,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_refresh_topology_in_client_init_with_no_majority_get_succeed() { for num_of_nodes in 2..4 { let ports = get_ports(num_of_nodes); @@ -1286,6 +1282,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_refresh_topology_even_with_zero_retries() { let name = "test_async_cluster_refresh_topology_even_with_zero_retries"; @@ -1297,7 +1294,7 @@ mod cluster_async { handler: _handler, .. } = MockEnv::with_client_builder( - ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(0) + ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(0) // Disable the rate limiter to refresh slots immediately on the MOVED error. .slots_refresh_rate_limit(Duration::from_secs(0), 0), name, @@ -1376,6 +1373,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_reconnect_even_with_zero_retries() { let name = "test_async_cluster_reconnect_even_with_zero_retries"; @@ -1457,6 +1455,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_refresh_slots_rate_limiter_skips_refresh() { let ports = get_ports(3); test_async_cluster_refresh_slots_rate_limiter_helper( @@ -1467,6 +1466,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_refresh_slots_rate_limiter_does_refresh_when_wait_duration_passed() { let ports = get_ports(3); test_async_cluster_refresh_slots_rate_limiter_helper( @@ -1477,6 +1477,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_ask_redirect() { let name = "node"; let completed = Arc::new(AtomicI32::new(0)); @@ -1526,6 +1527,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_ask_save_new_connection() { let name = "node"; let ping_attempts = Arc::new(AtomicI32::new(0)); @@ -1568,6 +1570,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_reset_routing_if_redirect_fails() { let name = "test_async_cluster_reset_routing_if_redirect_fails"; let completed = Arc::new(AtomicI32::new(0)); @@ -1606,6 +1609,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_ask_redirect_even_if_original_call_had_no_route() { let name = "node"; let completed = Arc::new(AtomicI32::new(0)); @@ -1657,6 +1661,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_ask_error_when_new_node_is_added() { let name = "ask_with_extra_nodes"; @@ -1711,6 +1716,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_replica_read() { let name = "node"; @@ -1815,16 +1821,19 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_to_all_primaries() { test_async_cluster_fan_out("FLUSHALL", vec![6379, 6381], None); } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_to_all_nodes() { test_async_cluster_fan_out("CONFIG SET", vec![6379, 6380, 6381, 6382], None); } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_once_to_each_primary_when_no_replicas_are_available() { test_async_cluster_fan_out( "CONFIG SET", @@ -1845,6 +1854,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_once_even_if_primary_has_multiple_slot_ranges() { test_async_cluster_fan_out( "CONFIG SET", @@ -1875,6 +1885,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_route_according_to_passed_argument() { let name = "test_async_cluster_route_according_to_passed_argument"; @@ -1939,6 +1950,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_and_aggregate_numeric_response_with_min() { let name = "test_async_cluster_fan_out_and_aggregate_numeric_response"; let mut cmd = Cmd::new(); @@ -1969,6 +1981,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_and_aggregate_logical_array_response() { let name = "test_async_cluster_fan_out_and_aggregate_logical_array_response"; let mut cmd = Cmd::new(); @@ -2019,6 +2032,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_and_return_one_succeeded_response() { let name = "test_async_cluster_fan_out_and_return_one_succeeded_response"; let mut cmd = Cmd::new(); @@ -2053,6 +2067,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_and_fail_one_succeeded_if_there_are_no_successes() { let name = "test_async_cluster_fan_out_and_fail_one_succeeded_if_there_are_no_successes"; let mut cmd = Cmd::new(); @@ -2085,6 +2100,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_and_return_all_succeeded_response() { let name = "test_async_cluster_fan_out_and_return_all_succeeded_response"; let cmd = cmd("FLUSHALL"); @@ -2111,6 +2127,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_and_fail_all_succeeded_if_there_is_a_single_failure() { let name = "test_async_cluster_fan_out_and_fail_all_succeeded_if_there_is_a_single_failure"; let cmd = cmd("FLUSHALL"); @@ -2144,6 +2161,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_first_succeeded_non_empty_or_all_empty_return_value_ignoring_nil_and_err_resps( ) { let name = @@ -2182,6 +2200,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_first_succeeded_non_empty_or_all_empty_return_err_if_all_resps_are_nil_and_errors( ) { let name = @@ -2215,6 +2234,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_first_succeeded_non_empty_or_all_empty_return_nil_if_all_resp_nil() { let name = "test_async_cluster_first_succeeded_non_empty_or_all_empty_return_nil_if_all_resp_nil"; @@ -2242,6 +2262,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_and_return_map_of_results_for_special_response_policy() { let name = "foo"; let mut cmd = Cmd::new(); @@ -2282,6 +2303,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_fan_out_and_combine_arrays_of_values() { let name = "foo"; let cmd = cmd("KEYS"); @@ -2315,6 +2337,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_split_multi_shard_command_and_combine_arrays_of_values() { let name = "test_async_cluster_split_multi_shard_command_and_combine_arrays_of_values"; let mut cmd = cmd("MGET"); @@ -2355,6 +2378,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_handle_asking_error_in_split_multi_shard_command() { let name = "test_async_cluster_handle_asking_error_in_split_multi_shard_command"; let mut cmd = cmd("MGET"); @@ -2404,6 +2428,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_pass_errors_from_split_multi_shard_command() { let name = "test_async_cluster_pass_errors_from_split_multi_shard_command"; let mut cmd = cmd("MGET"); @@ -2431,6 +2456,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_handle_missing_slots_in_split_multi_shard_command() { let name = "test_async_cluster_handle_missing_slots_in_split_multi_shard_command"; let mut cmd = cmd("MGET"); @@ -2464,6 +2490,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_with_username_and_password() { let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -2496,6 +2523,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_io_error() { let name = "node"; let completed = Arc::new(AtomicI32::new(0)); @@ -2534,6 +2562,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_non_retryable_error_should_not_retry() { let name = "node"; let completed = Arc::new(AtomicI32::new(0)); @@ -2570,6 +2599,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_read_from_primary() { let name = "node"; let found_ports = Arc::new(std::sync::Mutex::new(Vec::new())); @@ -2627,6 +2657,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_round_robin_read_from_replica() { let name = "node"; let found_ports = Arc::new(std::sync::Mutex::new(Vec::new())); @@ -2713,6 +2744,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_handle_complete_server_disconnect_without_panicking() { let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -2741,6 +2773,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_test_fast_reconnect() { // Note the 3 seconds connection check to differentiate between notifications and periodic let cluster = TestClusterContext::new_with_cluster_client_builder( @@ -2851,6 +2884,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_restore_resp3_pubsub_state_passive_disconnect() { let redis_ver = std::env::var("REDIS_VERSION").unwrap_or_default(); let use_sharded = redis_ver.starts_with("7."); @@ -3020,6 +3054,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_restore_resp3_pubsub_state_after_scale_out() { let redis_ver = std::env::var("REDIS_VERSION").unwrap_or_default(); let use_sharded = redis_ver.starts_with("7."); @@ -3258,6 +3293,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_resp3_pubsub() { let redis_ver = std::env::var("REDIS_VERSION").unwrap_or_default(); let use_sharded = redis_ver.starts_with("7."); @@ -3372,6 +3408,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_periodic_checks_update_topology_after_failover() { // This test aims to validate the functionality of periodic topology checks by detecting and updating topology changes. // We will repeatedly execute CLUSTER NODES commands against the primary node responsible for slot 0, recording its node ID. @@ -3442,6 +3479,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_recover_disconnected_management_connections() { // This test aims to verify that the management connections used for periodic checks are reconnected, in case that they get killed. // In order to test this, we choose a single node, kill all connections to it which aren't user connections, and then wait until new @@ -3494,6 +3532,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_with_client_name() { let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -3530,6 +3569,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_reroute_from_replica_if_in_loading_state() { /* Test replica in loading state. The expected behaviour is that the request will be directed to a different replica or the primary. depends on the read from replica policy. */ @@ -3585,6 +3625,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_read_from_primary_when_primary_loading() { // Test primary in loading state. The expected behaviour is that the request will be retried until the primary is no longer in loading state. let name = "test_async_cluster_read_from_primary_when_primary_loading"; @@ -3639,6 +3680,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_can_be_created_with_partial_slot_coverage() { let name = "test_async_cluster_can_be_created_with_partial_slot_coverage"; let slots_config = Some(vec![ @@ -3679,6 +3721,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_reconnect_after_complete_server_disconnect() { let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -3722,6 +3765,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_reconnect_after_complete_server_disconnect_route_to_many() { let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -3760,6 +3804,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_blocking_command_when_cluster_drops() { let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -3787,6 +3832,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_saves_reconnected_connection() { let name = "test_async_cluster_saves_reconnected_connection"; let ping_attempts = Arc::new(AtomicI32::new(0)); @@ -3856,6 +3902,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_periodic_checks_use_management_connection() { let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -3884,7 +3931,7 @@ mod cluster_async { .expect("Failed executing CLIENT LIST"); let mut client_list_parts = client_list.split('\n'); if client_list_parts - .any(|line| line.contains(MANAGEMENT_CONN_NAME) && line.contains("cmd=cluster")) + .any(|line| line.contains(MANAGEMENT_CONN_NAME) && line.contains("cmd=cluster")) && client_list.matches(MANAGEMENT_CONN_NAME).count() == 1 { return Ok::<_, RedisError>(()); } @@ -3954,6 +4001,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_only_management_connection_is_reconnected_after_connection_failure() { // This test will check two aspects: // 1. Ensuring that after a disconnection in the management connection, a new management connection is established. @@ -4023,6 +4071,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_dont_route_to_a_random_on_non_key_based_cmd() { // This test verifies that non-key-based commands do not get routed to a random node // when no connection is found for the given route. Instead, the appropriate error @@ -4087,6 +4136,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_route_to_random_on_key_based_cmd() { // This test verifies that key-based commands get routed to a random node // when no connection is found for the given route. The command should @@ -4143,6 +4193,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_do_not_retry_when_receiver_was_dropped() { let name = "test_async_cluster_do_not_retry_when_receiver_was_dropped"; let cmd = cmd("FAKE_COMMAND"); @@ -4196,6 +4247,7 @@ mod cluster_async { use super::*; #[test] + #[serial_test::serial] fn test_async_cluster_basic_cmd_with_mtls() { let cluster = TestClusterContext::new_with_mtls(3, 0); block_on_all(async move { @@ -4218,6 +4270,7 @@ mod cluster_async { } #[test] + #[serial_test::serial] fn test_async_cluster_should_not_connect_without_mtls_enabled() { let cluster = TestClusterContext::new_with_mtls(3, 0); block_on_all(async move { diff --git a/glide-core/redis-rs/redis/tests/test_cluster_scan.rs b/glide-core/redis-rs/redis/tests/test_cluster_scan.rs index 29a3c87b48..cfc4bae594 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster_scan.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster_scan.rs @@ -47,6 +47,7 @@ mod test_cluster_scan_async { } #[tokio::test] + #[serial_test::serial] async fn test_async_cluster_scan() { let cluster = TestClusterContext::new(3, 0); let mut connection = cluster.async_connection(None).await; @@ -87,7 +88,8 @@ mod test_cluster_scan_async { } } - #[tokio::test] // test cluster scan with slot migration in the middle + #[tokio::test] + #[serial_test::serial] // test cluster scan with slot migration in the middle async fn test_async_cluster_scan_with_migration() { let cluster = TestClusterContext::new(3, 0); @@ -162,7 +164,8 @@ mod test_cluster_scan_async { assert_eq!(keys, expected_keys); } - #[tokio::test] // test cluster scan with node fail in the middle + #[tokio::test] + #[serial_test::serial] // test cluster scan with node fail in the middle async fn test_async_cluster_scan_with_fail() { let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -224,7 +227,8 @@ mod test_cluster_scan_async { assert!(result.is_err()); } - #[tokio::test] // Test cluster scan with killing all masters during scan + #[tokio::test] + #[serial_test::serial] // Test cluster scan with killing all masters during scan async fn test_async_cluster_scan_with_all_masters_down() { let cluster = TestClusterContext::new_with_cluster_client_builder( 6, @@ -378,6 +382,7 @@ mod test_cluster_scan_async { } #[tokio::test] + #[serial_test::serial] // Test cluster scan with killing all replicas during scan async fn test_async_cluster_scan_with_all_replicas_down() { let cluster = TestClusterContext::new_with_cluster_client_builder( @@ -482,6 +487,7 @@ mod test_cluster_scan_async { assert_eq!(keys, expected_keys); } #[tokio::test] + #[serial_test::serial] // Test cluster scan with setting keys for each iteration async fn test_async_cluster_scan_set_in_the_middle() { let cluster = TestClusterContext::new(3, 0); @@ -541,6 +547,7 @@ mod test_cluster_scan_async { } #[tokio::test] + #[serial_test::serial] // Test cluster scan with deleting keys for each iteration async fn test_async_cluster_scan_dell_in_the_middle() { let cluster = TestClusterContext::new(3, 0); @@ -603,6 +610,7 @@ mod test_cluster_scan_async { } #[tokio::test] + #[serial_test::serial] // Testing cluster scan with Pattern option async fn test_async_cluster_scan_with_pattern() { let cluster = TestClusterContext::new(3, 0); @@ -661,6 +669,7 @@ mod test_cluster_scan_async { } #[tokio::test] + #[serial_test::serial] // Testing cluster scan with TYPE option async fn test_async_cluster_scan_with_type() { let cluster = TestClusterContext::new(3, 0); @@ -719,6 +728,7 @@ mod test_cluster_scan_async { } #[tokio::test] + #[serial_test::serial] // Testing cluster scan with COUNT option async fn test_async_cluster_scan_with_count() { let cluster = TestClusterContext::new(3, 0); @@ -782,6 +792,7 @@ mod test_cluster_scan_async { } #[tokio::test] + #[serial_test::serial] // Testing cluster scan when connection fails in the middle and we get an error // then cluster up again and scanning can continue without any problem async fn test_async_cluster_scan_failover() { diff --git a/glide-core/tests/test_standalone_client.rs b/glide-core/tests/test_standalone_client.rs index 448fa0faa0..5b269dd42c 100644 --- a/glide-core/tests/test_standalone_client.rs +++ b/glide-core/tests/test_standalone_client.rs @@ -193,20 +193,19 @@ mod standalone_client_tests { } fn test_read_from_replica(config: ReadFromReplicaTestConfig) { - let mut mocks = create_primary_mock_with_replicas( + let mut servers = create_primary_mock_with_replicas( config.number_of_initial_replicas - config.number_of_missing_replicas, ); let mut cmd = redis::cmd("GET"); cmd.arg("foo"); - for mock in mocks.iter() { + for server in servers.iter() { for _ in 0..3 { - mock.add_response(&cmd, "$-1\r\n".to_string()); + server.add_response(&cmd, "$-1\r\n".to_string()); } } - let mut addresses = get_mock_addresses(&mocks); - + let mut addresses = get_mock_addresses(&servers); for i in 4 - config.number_of_missing_replicas..4 { addresses.push(redis::ConnectionAddr::Tcp( "foo".to_string(), @@ -221,19 +220,32 @@ mod standalone_client_tests { let mut client = StandaloneClient::create_client(connection_request.into(), None) .await .unwrap(); - for mock in mocks.drain(1..config.number_of_replicas_dropped_after_connection + 1) { - mock.close().await; + logger_core::log_info( + "Test", + format!( + "Closing {} servers after connection established", + config.number_of_replicas_dropped_after_connection + ), + ); + for server in servers.drain(1..config.number_of_replicas_dropped_after_connection + 1) { + server.close().await; } + logger_core::log_info( + "Test", + format!("sending {} messages", config.number_of_requests_sent), + ); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; for _ in 0..config.number_of_requests_sent { let _ = client.send_command(&cmd).await; } }); assert_eq!( - mocks[0].get_number_of_received_commands(), + servers[0].get_number_of_received_commands(), config.expected_primary_reads ); - let mut replica_reads: Vec<_> = mocks + let mut replica_reads: Vec<_> = servers .iter() .skip(1) .map(|mock| mock.get_number_of_received_commands()) @@ -294,7 +306,9 @@ mod standalone_client_tests { test_read_from_replica(ReadFromReplicaTestConfig { read_from: ReadFrom::PreferReplica, expected_primary_reads: 0, - expected_replica_reads: vec![2, 3], + // Since we drop 1 replica after connection establishment + // we expect all reads to be handled by the remaining replicas + expected_replica_reads: vec![3, 3], number_of_replicas_dropped_after_connection: 1, number_of_requests_sent: 6, ..Default::default() diff --git a/glide-core/tests/utilities/mocks.rs b/glide-core/tests/utilities/mocks.rs index 160e8a3189..33b8ae4121 100644 --- a/glide-core/tests/utilities/mocks.rs +++ b/glide-core/tests/utilities/mocks.rs @@ -5,14 +5,15 @@ use futures_intrusive::sync::ManualResetEvent; use redis::{Cmd, ConnectionAddr, Value}; use std::collections::HashMap; use std::io; +use std::io::Read; +use std::io::Write; use std::net::TcpListener; +use std::net::TcpStream as StdTcpStream; use std::str::from_utf8; use std::sync::{ atomic::{AtomicU16, Ordering}, Arc, }; -use tokio::io::AsyncWriteExt; -use tokio::net::TcpStream; use tokio::sync::mpsc::UnboundedSender; pub struct MockedRequest { @@ -29,20 +30,24 @@ pub struct ServerMock { closing_completed_signal: Arc, } -async fn read_from_socket(buffer: &mut Vec, socket: &mut TcpStream) -> Option { - let _ = socket.readable().await; - - loop { - match socket.try_read_buf(buffer) { +fn read_from_socket( + buffer: &mut [u8], + socket: &mut StdTcpStream, + closing_signal: &Arc, +) -> Option { + while !closing_signal.is_set() { + let read_res = socket.read(buffer); // read() is using timeout + match read_res { Ok(0) => { return None; } - Ok(size) => return Some(size), + Ok(size) => { + return Some(size); + } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::Interrupted => { - tokio::task::yield_now().await; continue; } Err(_) => { @@ -50,43 +55,53 @@ async fn read_from_socket(buffer: &mut Vec, socket: &mut TcpStream) -> Optio } } } + // If we reached here, it means we got a signal to terminate + None +} + +/// Escape and print a RESP message +fn log_resp_message(msg: &str) { + logger_core::log_info( + "Test", + format!( + "{:?} {}", + std::thread::current().id(), + msg.replace('\r', "\\r").replace('\n', "\\n") + ), + ); } -async fn receive_and_respond_to_next_message( +fn receive_and_respond_to_next_message( receiver: &mut tokio::sync::mpsc::UnboundedReceiver, - socket: &mut TcpStream, + socket: &mut StdTcpStream, received_commands: &Arc, constant_responses: &HashMap, closing_signal: &Arc, ) -> bool { - let mut buffer = Vec::with_capacity(1024); - let size = tokio::select! { - size = read_from_socket(&mut buffer, socket) => { - let Some(size) = size else { - return false; - }; - size - }, - _ = closing_signal.wait() => { + let mut buffer = vec![0; 1024]; + let size = match read_from_socket(&mut buffer, socket, closing_signal) { + Some(size) => size, + None => { return false; } }; - let message = from_utf8(&buffer[..size]).unwrap().to_string(); + log_resp_message(&message); + let setinfo_count = message.matches("SETINFO").count(); if setinfo_count > 0 { let mut buffer = Vec::new(); for _ in 0..setinfo_count { super::encode_value(&Value::Okay, &mut buffer).unwrap(); } - socket.write_all(&buffer).await.unwrap(); + socket.write_all(&buffer).unwrap(); return true; } if let Some(response) = constant_responses.get(&message) { let mut buffer = Vec::new(); super::encode_value(response, &mut buffer).unwrap(); - socket.write_all(&buffer).await.unwrap(); + socket.write_all(&buffer).unwrap(); return true; } let Ok(request) = receiver.try_recv() else { @@ -94,7 +109,7 @@ async fn receive_and_respond_to_next_message( }; received_commands.fetch_add(1, Ordering::AcqRel); assert_eq!(message, request.expected_message); - socket.write_all(request.response.as_bytes()).await.unwrap(); + socket.write_all(request.response.as_bytes()).unwrap(); true } @@ -127,15 +142,11 @@ impl ServerMock { let closing_signal_clone = closing_signal.clone(); let closing_completed_signal = Arc::new(ManualResetEvent::new(false)); let closing_completed_signal_clone = closing_completed_signal.clone(); - let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(1) - .thread_name(format!("ServerMock - {address}")) - .enable_all() - .build() - .unwrap(); - runtime.spawn(async move { - let listener = tokio::net::TcpListener::from_std(listener).unwrap(); - let mut socket = listener.accept().await.unwrap().0; + let address_clone = address.clone(); + std::thread::spawn(move || { + logger_core::log_info("Test", format!("ServerMock started on: {}", address_clone)); + let mut socket: StdTcpStream = listener.accept().unwrap().0; + let _ = socket.set_read_timeout(Some(std::time::Duration::from_millis(10))); while receive_and_respond_to_next_message( &mut receiver, @@ -143,17 +154,25 @@ impl ServerMock { &received_commands_clone, &constant_responses, &closing_signal_clone, - ) - .await - {} + ) {} + + // Terminate the connection + let _ = socket.shutdown(std::net::Shutdown::Both); + // Now notify exit completed closing_completed_signal_clone.set(); + + logger_core::log_info( + "Test", + format!("{:?} ServerMock exited", std::thread::current().id()), + ); }); + Self { request_sender, address, received_commands, - runtime: Some(runtime), + runtime: None, closing_signal, closing_completed_signal, } @@ -186,6 +205,5 @@ impl Mock for ServerMock { impl Drop for ServerMock { fn drop(&mut self) { self.closing_signal.set(); - self.runtime.take().unwrap().shutdown_background(); } }