Skip to content

Commit

Permalink
Feature/replace credes (valkey-io#2651)
Browse files Browse the repository at this point in the history
* arrange typos and clean code

Signed-off-by: avifenesh <aviarchi1994@gmail.com>

* Add support for re-authentication on NOAUTH error

Signed-off-by: avifenesh <aviarchi1994@gmail.com>

* Add integration tests for password replacement in Redis connections

Signed-off-by: avifenesh <aviarchi1994@gmail.com>

* Implement password replacement functionality in Client and StandaloneClient

Signed-off-by: avifenesh <aviarchi1994@gmail.com>

* Add ReplaceConnectionPassword message to protobuf for password reset functionality

Signed-off-by: avifenesh <aviarchi1994@gmail.com>

* Add replace_connection_password method to manage connection password updates

Signed-off-by: avifenesh <aviarchi1994@gmail.com>

* Update CHANGELOG.md to include support for replacing connection configured password

Signed-off-by: avifenesh <aviarchi1994@gmail.com>

* Node: add repalceConnectionPassword method to manage connection password updates

Signed-off-by: jhpung <jhpung.dev@gmail.com>
Signed-off-by: avifenesh <aviarchi1994@gmail.com>

---------

Signed-off-by: avifenesh <aviarchi1994@gmail.com>
Signed-off-by: jhpung <jhpung.dev@gmail.com>
Co-authored-by: jhpung <jhpung.dev@gmail.com>
  • Loading branch information
avifenesh and jhpung authored Nov 14, 2024
1 parent 664cc8e commit 7d65d4e
Show file tree
Hide file tree
Showing 23 changed files with 1,098 additions and 129 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ jobs:
working-directory: ./python
run: |
source .env/bin/activate
pip install -r dev_requirements.txt
cd python/tests/
pytest --asyncio-mode=auto --html=pytest_report.html --self-contained-html
Expand Down Expand Up @@ -177,6 +178,7 @@ jobs:
working-directory: ./python
run: |
source .env/bin/activate
pip install -r dev_requirements.txt
cd python/tests/
pytest --asyncio-mode=auto -k test_pubsub --html=pytest_report.html --self-contained-html
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node, Python: Adding support for replacing connection configured password ([#2651](https://github.com/valkey-io/valkey-glide/pull/2651))
* Node: Add FT._ALIASLIST command([#2652](https://github.com/valkey-io/valkey-glide/pull/2652))
* Python: Python: `FT._ALIASLIST` command added([#2638](https://github.com/valkey-io/valkey-glide/pull/2638))
* Node: alias commands added: FT.ALIASADD, FT.ALIADDEL, FT.ALIASUPDATE([#2596](https://github.com/valkey-io/valkey-glide/pull/2596))
Expand Down
6 changes: 2 additions & 4 deletions glide-core/redis-rs/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repository = "https://github.com/redis-rs/redis-rs"
documentation = "https://docs.rs/redis"
license = "BSD-3-Clause"
edition = "2021"
rust-version = "1.65"
rust-version = "1.67"
readme = "../README.md"

[package.metadata.docs.rs]
Expand Down Expand Up @@ -47,7 +47,6 @@ pin-project-lite = { version = "0.2", optional = true }
tokio-util = { version = "0.7", optional = true }
tokio = { version = "1", features = ["rt", "net", "time", "sync"] }
socket2 = { version = "0.5", features = ["all"], optional = true }
fast-math = { version = "0.1.1", optional = true }
dispose = { version = "0.5.0", optional = true }

# Only needed for the connection manager
Expand All @@ -67,7 +66,7 @@ dashmap = { version = "6.0", optional = true }
async-trait = { version = "0.1.24", optional = true }

# Only needed for tokio support
tokio-retry2 = {version = "0.5", features = ["jitter"], optional = true}
tokio-retry2 = { version = "0.5", features = ["jitter"], optional = true }

# Only needed for native tls
native-tls = { version = "0.2", optional = true }
Expand Down Expand Up @@ -125,7 +124,6 @@ aio = [
"tokio-util/codec",
"combine/tokio",
"async-trait",
"fast-math",
"dispose",
]
geospatial = []
Expand Down
128 changes: 109 additions & 19 deletions glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use std::time::Duration;
#[cfg(feature = "tokio-comp")]
use tokio_util::codec::Decoder;

// Default connection timeout in ms
const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250);

// Senders which the result of a single request are sent through
type PipelineOutput = oneshot::Sender<RedisResult<Value>>;

Expand Down Expand Up @@ -76,7 +79,7 @@ struct PipelineMessage<S> {
/// interface provided by `Pipeline` an easy interface of request to response, hiding the `Stream`
/// and `Sink`.
#[derive(Clone)]
struct Pipeline<SinkItem> {
pub(crate) struct Pipeline<SinkItem> {
sender: mpsc::Sender<PipelineMessage<SinkItem>>,
push_manager: Arc<ArcSwap<PushManager>>,
is_stream_closed: Arc<AtomicBool>,
Expand Down Expand Up @@ -399,6 +402,7 @@ where
self.push_manager.store(Arc::new(push_manager));
}

/// Checks if the pipeline is closed.
pub fn is_closed(&self) -> bool {
self.is_stream_closed.load(Ordering::Relaxed)
}
Expand All @@ -413,6 +417,7 @@ pub struct MultiplexedConnection {
response_timeout: Duration,
protocol: ProtocolVersion,
push_manager: PushManager,
password: Option<String>,
}

impl Debug for MultiplexedConnection {
Expand Down Expand Up @@ -455,35 +460,28 @@ impl MultiplexedConnection {
where
C: Unpin + AsyncRead + AsyncWrite + Send + 'static,
{
fn boxed(
f: impl Future<Output = ()> + Send + 'static,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(f)
}

#[cfg(not(feature = "tokio-comp"))]
compile_error!("tokio-comp feature is required for aio feature");

let redis_connection_info = &connection_info.redis;
let codec = ValueCodec::default()
.framed(stream)
.and_then(|msg| async move { msg });
let (mut pipeline, driver) =
Pipeline::new(codec, glide_connection_options.disconnect_notifier);
let driver = boxed(driver);
let driver = Box::pin(driver);
let pm = PushManager::default();
if let Some(sender) = glide_connection_options.push_sender {
pm.replace_sender(sender);
}

pipeline.set_push_manager(pm.clone()).await;
let mut con = MultiplexedConnection {
pipeline,
db: connection_info.redis.db,
response_timeout,
push_manager: pm,
protocol: redis_connection_info.protocol,
};

let mut con = MultiplexedConnection::builder(pipeline)
.with_db(connection_info.redis.db)
.with_response_timeout(response_timeout)
.with_push_manager(pm)
.with_protocol(connection_info.redis.protocol)
.with_password(connection_info.redis.password.clone())
.build()
.await?;

let driver = {
let auth = setup_connection(&connection_info.redis, &mut con);

Expand All @@ -502,6 +500,7 @@ impl MultiplexedConnection {
}
}
};

Ok((con, driver))
}

Expand Down Expand Up @@ -575,6 +574,97 @@ impl MultiplexedConnection {
self.push_manager = push_manager.clone();
self.pipeline.set_push_manager(push_manager).await;
}

/// Replace the password used to authenticate with the server.
/// If `None` is provided, the password will be removed.
pub async fn update_connection_password(
&mut self,
password: Option<String>,
) -> RedisResult<Value> {
self.password = password;
Ok(Value::Okay)
}

/// Creates a new `MultiplexedConnectionBuilder` for constructing a `MultiplexedConnection`.
pub(crate) fn builder(pipeline: Pipeline<Vec<u8>>) -> MultiplexedConnectionBuilder {
MultiplexedConnectionBuilder::new(pipeline)
}
}

/// A builder for creating `MultiplexedConnection` instances.
pub struct MultiplexedConnectionBuilder {
pipeline: Pipeline<Vec<u8>>,
db: Option<i64>,
response_timeout: Option<Duration>,
push_manager: Option<PushManager>,
protocol: Option<ProtocolVersion>,
password: Option<String>,
}

impl MultiplexedConnectionBuilder {
/// Creates a new builder with the required pipeline
pub(crate) fn new(pipeline: Pipeline<Vec<u8>>) -> Self {
Self {
pipeline,
db: None,
response_timeout: None,
push_manager: None,
protocol: None,
password: None,
}
}

/// Sets the database index for the `MultiplexedConnectionBuilder`.
pub fn with_db(mut self, db: i64) -> Self {
self.db = Some(db);
self
}

/// Sets the response timeout for the `MultiplexedConnectionBuilder`.
pub fn with_response_timeout(mut self, timeout: Duration) -> Self {
self.response_timeout = Some(timeout);
self
}

/// Sets the push manager for the `MultiplexedConnectionBuilder`.
pub fn with_push_manager(mut self, push_manager: PushManager) -> Self {
self.push_manager = Some(push_manager);
self
}

/// Sets the protocol version for the `MultiplexedConnectionBuilder`.
pub fn with_protocol(mut self, protocol: ProtocolVersion) -> Self {
self.protocol = Some(protocol);
self
}

/// Sets the password for the `MultiplexedConnectionBuilder`.
pub fn with_password(mut self, password: Option<String>) -> Self {
self.password = password;
self
}

/// Builds and returns a new `MultiplexedConnection` instance using the configured settings.
pub async fn build(self) -> RedisResult<MultiplexedConnection> {
let db = self.db.unwrap_or_default();
let response_timeout = self
.response_timeout
.unwrap_or(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT);
let push_manager = self.push_manager.unwrap_or_default();
let protocol = self.protocol.unwrap_or_default();
let password = self.password;

let con = MultiplexedConnection {
pipeline: self.pipeline,
db,
response_timeout,
push_manager,
protocol,
password,
};

Ok(con)
}
}

impl ConnectionLike for MultiplexedConnection {
Expand Down
4 changes: 2 additions & 2 deletions glide-core/redis-rs/redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ where

/// Returns the connection status.
///
/// The connection is open until any `read_response` call recieved an
/// The connection is open until any `read_response` call received an
/// invalid response from the server (most likely a closed or dropped
/// connection, otherwise a Redis protocol error). When using unix
/// sockets the connection is open until writing a command failed with a
Expand Down Expand Up @@ -808,7 +808,7 @@ where
self.refresh_slots()?;

// Given that there are commands that need to be retried, it means something in the cluster
// topology changed. Execute each command seperately to take advantage of the existing
// topology changed. Execute each command separately to take advantage of the existing
// retry logic that handles these cases.
for retry_idx in to_retry {
let cmd = &cmds[retry_idx];
Expand Down
Loading

0 comments on commit 7d65d4e

Please sign in to comment.