Skip to content

Commit

Permalink
Add connection timeout configuration (valkey-io#2823)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Shoham Elias <shohame@amazon.com>
  • Loading branch information
shohamazon authored Dec 26, 2024
1 parent 8bfa0d8 commit 0bcca10
Show file tree
Hide file tree
Showing 34 changed files with 746 additions and 51 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#### Changes
* Node, Python, Java: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815), [#2860](https://github.com/valkey-io/valkey-glide/pull/2860))
* Java: Bump protobuf (protoc) version ([#2561](https://github.com/valkey-io/valkey-glide/pull/2561), [#2802](https://github.com/valkey-io/valkey-glide/pull/2802)
* Java: Bump protobuf (protoc) version ([#2561](https://github.com/valkey-io/valkey-glide/pull/2561), [#2802](https://github.com/valkey-io/valkey-glide/pull/2802))
* Java: bump `netty` version ([#2777](https://github.com/valkey-io/valkey-glide/pull/2777))
* Node: Remove native package references for MacOs x64 architecture ([#2799](https://github.com/valkey-io/valkey-glide/issues/2799))

* Node, Python, Java: Add connection timeout to client configuration ([#2823](https://github.com/valkey-io/valkey-glide/issues/2823))
#### Breaking Changes

#### Fixes
Expand Down
5 changes: 5 additions & 0 deletions glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ pub struct GlideConnectionOptions {
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
/// Connection timeout duration.
///
/// This optional field sets the maximum duration to wait when attempting to establish
/// a connection. If `None`, the connection will use `DEFAULT_CONNECTION_TIMEOUT`.
pub connection_timeout: Option<Duration>,
}

/// To enable async support you need to enable the feature: `tokio-comp`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ where
push_sender: None,
disconnect_notifier,
discover_az,
connection_timeout: Some(params.connection_timeout),
},
)
.await
Expand Down
1 change: 1 addition & 0 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,7 @@ where
push_sender,
disconnect_notifier,
discover_az,
connection_timeout: Some(cluster_params.connection_timeout),
};

let connections = Self::create_initial_connections(
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"FUNCTION STATS" => RouteBy::AllNodes,

b"DBSIZE"
| b"DEBUG"
| b"FLUSHALL"
| b"FLUSHDB"
| b"FT._ALIASLIST"
Expand Down Expand Up @@ -717,7 +718,6 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"COMMAND LIST"
| b"COMMAND"
| b"CONFIG GET"
| b"DEBUG"
| b"ECHO"
| b"FUNCTION LIST"
| b"LASTSAVE"
Expand Down
12 changes: 8 additions & 4 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use versions::Versioning;

pub const HEARTBEAT_SLEEP_DURATION: Duration = Duration::from_secs(1);
pub const DEFAULT_RETRIES: u32 = 3;
/// Note: If you change the default value, make sure to change the documentation in *all* wrappers.
pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
/// Note: If you change the default value, make sure to change the documentation in *all* wrappers.
pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
pub const FINISHED_SCAN_CURSOR: &str = "finished";

/// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory:
Expand Down Expand Up @@ -571,8 +572,9 @@ async fn create_cluster_client(
Some(PeriodicCheck::ManualInterval(interval)) => Some(interval),
None => Some(DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL),
};
let connection_timeout = to_duration(request.connection_timeout, DEFAULT_CONNECTION_TIMEOUT);
let mut builder = redis::cluster::ClusterClientBuilder::new(initial_nodes)
.connection_timeout(INTERNAL_CONNECTION_TIMEOUT)
.connection_timeout(connection_timeout)
.retries(DEFAULT_RETRIES);
let read_from_strategy = request.read_from.unwrap_or_default();
builder = builder.read_from(match read_from_strategy {
Expand Down Expand Up @@ -718,6 +720,8 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
"\nStandalone mode"
};
let request_timeout = format_optional_value("Request timeout", request.request_timeout);
let connection_timeout =
format_optional_value("Connection timeout", request.connection_timeout);
let database_id = format!("\ndatabase ID: {}", request.database_id);
let rfr_strategy = request
.read_from
Expand Down Expand Up @@ -774,7 +778,7 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
);

format!(
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}",
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{connection_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}",
)
}

Expand Down
20 changes: 17 additions & 3 deletions glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::task;
use tokio::time::timeout;
use tokio_retry2::{Retry, RetryError};

use super::{run_with_timeout, DEFAULT_CONNECTION_ATTEMPT_TIMEOUT};
use super::{run_with_timeout, DEFAULT_CONNECTION_TIMEOUT};

/// The reason behind the call to `reconnect()`
#[derive(PartialEq, Eq, Debug, Clone)]
Expand Down Expand Up @@ -71,7 +71,11 @@ async fn get_multiplexed_connection(
connection_options: &GlideConnectionOptions,
) -> RedisResult<MultiplexedConnection> {
run_with_timeout(
Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT),
Some(
connection_options
.connection_timeout
.unwrap_or(DEFAULT_CONNECTION_TIMEOUT),
),
client.get_multiplexed_async_connection(connection_options.clone()),
)
.await
Expand Down Expand Up @@ -113,6 +117,7 @@ async fn create_connection(
retry_strategy: RetryStrategy,
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
discover_az: bool,
connection_timeout: Duration,
) -> Result<ReconnectingConnection, (ReconnectingConnection, RedisError)> {
let client = &connection_backend.connection_info;
let connection_options = GlideConnectionOptions {
Expand All @@ -121,6 +126,7 @@ async fn create_connection(
TokioDisconnectNotifier::new(),
)),
discover_az,
connection_timeout: Some(connection_timeout),
};
let action = || async {
get_multiplexed_connection(client, &connection_options)
Expand Down Expand Up @@ -206,6 +212,7 @@ impl ReconnectingConnection {
tls_mode: TlsMode,
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
discover_az: bool,
connection_timeout: Duration,
) -> Result<ReconnectingConnection, (ReconnectingConnection, RedisError)> {
log_debug(
"connection creation",
Expand All @@ -218,7 +225,14 @@ impl ReconnectingConnection {
connection_available_signal: ManualResetEvent::new(true),
client_dropped_flagged: AtomicBool::new(false),
};
create_connection(backend, connection_retry_strategy, push_sender, discover_az).await
create_connection(
backend,
connection_retry_strategy,
push_sender,
discover_az,
connection_timeout,
)
.await
}

pub(crate) fn node_address(&self) -> String {
Expand Down
10 changes: 10 additions & 0 deletions glide-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use super::get_redis_connection_info;
use super::reconnecting_connection::{ReconnectReason, ReconnectingConnection};
use super::{to_duration, DEFAULT_CONNECTION_TIMEOUT};
use super::{ConnectionRequest, NodeAddress, TlsMode};
use crate::client::types::ReadFrom as ClientReadFrom;
use crate::retry_strategies::RetryStrategy;
Expand All @@ -15,6 +16,7 @@ use redis::{PushInfo, RedisError, RedisResult, Value};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use telemetrylib::Telemetry;
use tokio::sync::mpsc;
use tokio::task;
Expand Down Expand Up @@ -130,6 +132,11 @@ impl StandaloneClient {
Some(ClientReadFrom::AZAffinity(_))
);

let connection_timeout = to_duration(
connection_request.connection_timeout,
DEFAULT_CONNECTION_TIMEOUT,
);

let mut stream = stream::iter(connection_request.addresses.iter())
.map(|address| async {
get_connection_and_replication_info(
Expand All @@ -143,6 +150,7 @@ impl StandaloneClient {
tls_mode.unwrap_or(TlsMode::NoTls),
&push_sender,
discover_az,
connection_timeout,
)
.await
.map_err(|err| (format!("{}:{}", address.host, address.port), err))
Expand Down Expand Up @@ -552,6 +560,7 @@ async fn get_connection_and_replication_info(
tls_mode: TlsMode,
push_sender: &Option<mpsc::UnboundedSender<PushInfo>>,
discover_az: bool,
connection_timeout: Duration,
) -> Result<(ReconnectingConnection, Value), (ReconnectingConnection, RedisError)> {
let result = ReconnectingConnection::new(
address,
Expand All @@ -560,6 +569,7 @@ async fn get_connection_and_replication_info(
tls_mode,
push_sender.clone(),
discover_az,
connection_timeout,
)
.await;
let reconnecting_connection = match result {
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/client/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct ConnectionRequest {
pub addresses: Vec<NodeAddress>,
pub cluster_mode_enabled: bool,
pub request_timeout: Option<u32>,
pub connection_timeout: Option<u32>,
pub connection_retry_strategy: Option<ConnectionRetryStrategy>,
pub periodic_checks: Option<PeriodicCheck>,
pub pubsub_subscriptions: Option<redis::PubSubSubscriptionInfo>,
Expand Down Expand Up @@ -147,6 +148,7 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {
.collect();
let cluster_mode_enabled = value.cluster_mode_enabled;
let request_timeout = none_if_zero(value.request_timeout);
let connection_timeout = none_if_zero(value.connection_timeout);
let connection_retry_strategy =
value
.connection_retry_strategy
Expand Down Expand Up @@ -214,6 +216,7 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {
addresses,
cluster_mode_enabled,
request_timeout,
connection_timeout,
connection_retry_strategy,
periodic_checks,
pubsub_subscriptions,
Expand Down
3 changes: 2 additions & 1 deletion glide-core/src/protobuf/connection_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ message AuthenticationInfo {

enum ProtocolVersion {
RESP3 = 0;
RESP2 = 1;
RESP2 = 1;
}

message PeriodicChecksManualInterval {
Expand Down Expand Up @@ -71,6 +71,7 @@ message ConnectionRequest {
PubSubSubscriptions pubsub_subscriptions = 13;
uint32 inflight_requests_limit = 14;
string client_az = 15;
uint32 connection_timeout = 16;
}

message ConnectionRetryStrategy {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.configuration;

import lombok.Getter;
import lombok.experimental.SuperBuilder;

/**
* Advanced configuration settings class for creating a client. Shared settings for standalone and
* cluster clients.
*/
@Getter
@SuperBuilder
public abstract class AdvancedBaseClientConfiguration {

/**
* The duration in milliseconds to wait for a TCP/TLS connection to complete. This applies both
* during initial client creation and any reconnections that may occur during request processing.
* **Note**: A high connection timeout may lead to prolonged blocking of the entire command
* pipeline. If not explicitly set, a default value of 250 milliseconds will be used.
*/
private final Integer connectionTimeout;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.configuration;

import glide.api.GlideClient;
import lombok.Getter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

/**
* Represents advanced configuration settings for a Standalone {@link GlideClient} used in {@link
* GlideClientConfiguration}.
*
* @example
* <pre>{@code
* AdvancedGlideClientConfiguration config = AdvancedGlideClientConfiguration.builder()
* .connectionTimeout(500)
* .build();
* }</pre>
*/
@Getter
@SuperBuilder
@ToString
public class AdvancedGlideClientConfiguration extends AdvancedBaseClientConfiguration {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.configuration;

import glide.api.GlideClusterClient;
import lombok.Getter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

/**
* Represents advanced configuration settings for a Standalone {@link GlideClusterClient} used in
* {@link GlideClusterClientConfiguration}.
*
* @example
* <pre>{@code
* AdvancedGlideClusterClientConfiguration config = AdvancedGlideClusterClientConfiguration.builder()
* .connectionTimeout(500)
* .build();
* }</pre>
*/
@Getter
@SuperBuilder
@ToString
public class AdvancedGlideClusterClientConfiguration extends AdvancedBaseClientConfiguration {}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public abstract class BaseClientConfiguration {
* The duration in milliseconds that the client should wait for a request to complete. This
* duration encompasses sending the request, awaiting for a response from the server, and any
* required reconnections or retries. If the specified timeout is exceeded for a pending request,
* it will result in a timeout error. If not set, a default value will be used.
* it will result in a timeout error. If not explicitly set, a default value of 250 milliseconds
* will be used.
*/
private final Integer requestTimeout;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* .clientName("GLIDE")
* .subscriptionConfiguration(subscriptionConfiguration)
* .inflightRequestsLimit(1000)
* .advancedConfiguration(AdvancedGlideClientConfiguration.builder().connectionTimeout(500).build())
* .build();
* }</pre>
*/
Expand All @@ -39,4 +40,7 @@ public class GlideClientConfiguration extends BaseClientConfiguration {

/** Subscription configuration for the current client. */
private final StandaloneSubscriptionConfiguration subscriptionConfiguration;

/** Advanced configuration settings for the client. */
private final AdvancedGlideClientConfiguration advancedConfiguration;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* .clientName("GLIDE")
* .subscriptionConfiguration(subscriptionConfiguration)
* .inflightRequestsLimit(1000)
* .advancedConfiguration(AdvancedGlideClusterClientConfiguration.builder().connectionTimeout(500).build())
* .build();
* }</pre>
*/
Expand All @@ -32,4 +33,7 @@ public class GlideClusterClientConfiguration extends BaseClientConfiguration {

/** Subscription configuration for the current client. */
private final ClusterSubscriptionConfiguration subscriptionConfiguration;

/** Advanced configuration settings for the client. */
private final AdvancedGlideClusterClientConfiguration advancedConfiguration;
}
Loading

0 comments on commit 0bcca10

Please sign in to comment.