Skip to content

Commit

Permalink
fix(relay-client): lack of proper connection attempt backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
vmenge committed Jan 29, 2025
1 parent ecadd0b commit 40a4735
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 26 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ clap = { version = "4.5", features = ["derive"] }
color-eyre = "0.6.2"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1.15"
tokio-util = "0.7.13"
uuid = "1.11.0"
derive_more = { version = "0.99" }
tracing = "0.1"
Expand Down
1 change: 1 addition & 0 deletions relay-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rust-version.workspace = true
[dependencies]
orb-relay-messages = { workspace = true, features = ["client"] }
tokio = { workspace = true, fetures = ["full"] }
tokio-util.workspace = true
tonic = { workspace = true, features = ["tls-roots"] }
derive_more.workspace = true
color-eyre.workspace = true
Expand Down
45 changes: 26 additions & 19 deletions relay-client/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use secrecy::ExposeSecret;
use std::collections::HashMap;
use tokio::{task, time};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tonic::{
transport::{ClientTlsConfig, Endpoint},
Streaming,
Expand Down Expand Up @@ -78,31 +79,31 @@ pub fn run(props: Props) -> (flume::Sender<Msg>, task::JoinHandle<Result<(), Err
loop {
conn_attempts += 1;

match main_loop(
let cancellation_token = CancellationToken::new();
let result = main_loop(
&mut state,
&props,
relay_actor_tx_clone.clone(),
relay_actor_rx.clone(),
cancellation_token.clone(),
)
.await
{
Err(Err::StopRequest) => {
.await;

if let Err(e) = result {
cancellation_token.cancel();
if let Err::StopRequest = e {
return Err(Err::StopRequest);
}
} else if props.opts.max_connection_attempts <= conn_attempts {
return Err(e);
} else {
error!(
"RelayClient errored out {e:?}. Retrying in {}s",
props.opts.connection_timeout.as_secs()
);

Err(e) => {
if props.opts.max_connection_attempts <= conn_attempts {
return Err(e);
} else {
error!(
"RelayClient errored out {e:?}. Retrying in {}s",
props.opts.connection_timeout.as_secs()
);
}
time::sleep(props.opts.connection_backoff).await;
}

Ok(()) => (),
}
};
}
});

Expand All @@ -114,10 +115,11 @@ async fn main_loop(
props: &Props,
relay_actor_tx: flume::Sender<Msg>,
relay_actor_rx: flume::Receiver<Msg>,
cancellation_token: CancellationToken,
) -> Result<(), Err> {
let mut response_stream = time::timeout(
props.opts.connection_timeout,
connect(props, &relay_actor_tx),
connect(props, &relay_actor_tx, cancellation_token),
)
.await
.wrap_err("Timed out trying to establish a connection")??;
Expand Down Expand Up @@ -361,6 +363,7 @@ fn handle_ack(state: &mut State, seq: Seq) {
async fn connect(
props: &Props,
relay_actor_tx: &flume::Sender<Msg>,
cancellation_token: CancellationToken,
) -> Result<Streaming<RelayConnectResponse>, Err> {
let Props {
opts,
Expand Down Expand Up @@ -407,7 +410,11 @@ async fn connect(
.wrap_err("Failed to send RelayConnectRequest")?;

let mut response_stream: Streaming<RelayConnectResponse> = relay_client
.relay_connect(flume_receiver_stream::new(tonic_rx.clone(), 4))
.relay_connect(flume_receiver_stream::new(
tonic_rx.clone(),
4,
cancellation_token,
))
.await?
.into_inner();

Expand Down
21 changes: 18 additions & 3 deletions relay-client/src/flume_receiver_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;

/// Creates a `tokio_stream::wrappers::ReceiverStream` from a `flume::Receiver<_>`
/// ## example
Expand All @@ -13,15 +14,29 @@ use tokio_stream::wrappers::ReceiverStream;
pub fn new<T: Send + 'static>(
flume_rx: flume::Receiver<T>,
tokio_mpsc_receiver_buffer: usize,
cancellation_token: CancellationToken,
) -> ReceiverStream<T> {
let (tx, rx) = mpsc::channel(tokio_mpsc_receiver_buffer);

tokio::spawn(async move {
while let Ok(msg) = flume_rx.recv_async().await {
if tx.send(msg).await.is_err() {
break;
loop {
tokio::select! {
biased;

_ = cancellation_token.cancelled() => {
break;
}


msg = flume_rx.recv_async() => {
if tx.send(msg?).await.is_err() {
break;
}
}
}
}

Ok::<_,flume::RecvError>(())
});

ReceiverStream::new(rx)
Expand Down
2 changes: 2 additions & 0 deletions relay-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ pub struct ClientOpts {
auth: Auth,
#[builder(default = Duration::from_secs(20))]
connection_timeout: Duration,
#[builder(default = Duration::from_secs(20))]
connection_backoff: Duration,
#[builder(default = Amount::Infinite)]
max_connection_attempts: Amount,
#[builder(default = Duration::from_secs(20))]
Expand Down
37 changes: 35 additions & 2 deletions relay-client/tests/connect_attempts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use orb_relay_client::{Amount, Auth, Client, ClientOpts};
use orb_relay_messages::relay::{
entity::EntityType, relay_connect_request::Msg, ConnectRequest, ConnectResponse,
};
use std::time::Duration;
use std::time::{Duration, Instant};
use test_server::{IntoRes, TestServer};
use tokio::time;

Expand Down Expand Up @@ -53,7 +53,7 @@ async fn connects() {
#[tokio::test]
async fn tries_to_connect_the_expected_number_of_times_then_gives_up() {
// Arrange
let expected_attempts = 2;
let expected_attempts = 3;
let sv = TestServer::new(0, |attempts, _conn_req, _| {
*attempts += 1;
ConnectResponse {
Expand All @@ -72,6 +72,7 @@ async fn tries_to_connect_the_expected_number_of_times_then_gives_up() {
.auth(Auth::Token(Default::default()))
.max_connection_attempts(Amount::Val(expected_attempts))
.connection_timeout(Duration::from_millis(10))
.connection_backoff(Duration::ZERO)
.build();

// Act
Expand All @@ -83,3 +84,35 @@ async fn tries_to_connect_the_expected_number_of_times_then_gives_up() {
let actual_attempts = sv.state().await;
assert_eq!(*actual_attempts, expected_attempts);
}

#[tokio::test]
async fn sleeps_for_backoff_period_between_connection_attempts() {
// Arrange
let sv = TestServer::new((0, Instant::now()), |attempts, _conn_req, _| {
attempts.0 += 1;
ConnectResponse {
client_id: "doesntmatter".to_string(),
success: false,
error: "nothing".to_string(),
}
.into_res()
})
.await;

let opts = ClientOpts::entity(EntityType::App)
.id("foo")
.namespace("bar")
.endpoint(format!("http://{}", sv.addr()))
.auth(Auth::Token(Default::default()))
.max_connection_attempts(Amount::Infinite)
.connection_backoff(Duration::from_millis(50))
.build();

// Act
let (_client, _handle) = Client::connect(opts);

// Assert
time::sleep(Duration::from_millis(150)).await;
let actual_attempts = sv.state().await;
assert_eq!(actual_attempts.0, 3);
}

0 comments on commit 40a4735

Please sign in to comment.