Skip to content

Commit

Permalink
Fix shutdown race-condition by introducing a flush_timeout before dro…
Browse files Browse the repository at this point in the history
…pping data (#2821)

Resolves: #2556

### What
We've always had a race condition during startup where a viewer might
not exist despite a client trying to connect to it.

For users of spawn we could have added a bandaid here by probably
increasing this sleep:

https://github.com/rerun-io/rerun/blob/main/rerun_py/rerun_sdk/rerun/sinks.py#L180

This wouldn't resolve issues for other use-cases of manually launching a
viewer and client without tight orchestration.

Instead, we introduce a timeout (which can optionally be set to None),
to use during the disconnected checks when we are flushing. This gives
power-users the option to specify None and reduce risk of losing data
(at an increased risk of blocking during flush when things go wrong).

This PR also bumps a couple of debug logs up to warnings to make it more
clear when data is being dropped.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested [demo.rerun.io](https://demo.rerun.io/pr/2821) (if
applicable)

- [PR Build Summary](https://build.rerun.io/pr/2821)
- [Docs
preview](https://rerun.io/preview/pr%3Ajleibs%2Ftimeout_disconnects/docs)
- [Examples
preview](https://rerun.io/preview/pr%3Ajleibs%2Ftimeout_disconnects/examples)
  • Loading branch information
jleibs authored Jul 26, 2023
1 parent f74dc2d commit 5b64e66
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 84 deletions.
2 changes: 1 addition & 1 deletion crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod recording_stream;
pub use self::msg_sender::{MsgSender, MsgSenderError};
pub use self::recording_stream::{RecordingStream, RecordingStreamBuilder};

pub use re_sdk_comms::default_server_addr;
pub use re_sdk_comms::{default_flush_timeout, default_server_addr};

pub use re_log_types::{ApplicationId, EntityPath, LegacyComponent, StoreId, StoreKind};
pub use re_types::ComponentName;
Expand Down
8 changes: 6 additions & 2 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,14 @@ pub struct TcpSink {
impl TcpSink {
/// Connect to the given address in a background thread.
/// Retries until successful.
///
/// `flush_timeout` is the minimum time the [`TcpSink`] will wait during a flush
/// before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
#[inline]
pub fn new(addr: std::net::SocketAddr) -> Self {
pub fn new(addr: std::net::SocketAddr, flush_timeout: Option<std::time::Duration>) -> Self {
Self {
client: re_sdk_comms::Client::new(addr),
client: re_sdk_comms::Client::new(addr, flush_timeout),
}
}
}
Expand Down
22 changes: 17 additions & 5 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,20 +219,28 @@ impl RecordingStreamBuilder {
/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
/// remote Rerun instance.
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// ## Example
///
/// ```no_run
/// let rec_stream = re_sdk::RecordingStreamBuilder::new("my_app")
/// .connect(re_sdk::default_server_addr())?;
/// .connect(re_sdk::default_server_addr(), re_sdk::default_flush_timeout())?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
pub fn connect(self, addr: std::net::SocketAddr) -> RecordingStreamResult<RecordingStream> {
pub fn connect(
self,
addr: std::net::SocketAddr,
flush_timeout: Option<std::time::Duration>,
) -> RecordingStreamResult<RecordingStream> {
let (enabled, store_info, batcher_config) = self.into_args();
if enabled {
RecordingStream::new(
store_info,
batcher_config,
Box::new(crate::log_sink::TcpSink::new(addr)),
Box::new(crate::log_sink::TcpSink::new(addr, flush_timeout)),
)
} else {
re_log::debug!("Rerun disabled - call to connect() ignored");
Expand Down Expand Up @@ -850,11 +858,15 @@ impl RecordingStream {
/// Swaps the underlying sink for a [`crate::log_sink::TcpSink`] sink pre-configured to use
/// the specified address.
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn connect(&self, addr: std::net::SocketAddr) {
self.set_sink(Box::new(crate::log_sink::TcpSink::new(addr)));
pub fn connect(&self, addr: std::net::SocketAddr, flush_timeout: Option<std::time::Duration>) {
self.set_sink(Box::new(crate::log_sink::TcpSink::new(addr, flush_timeout)));
}

/// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated
Expand Down
37 changes: 27 additions & 10 deletions crates/re_sdk_comms/src/buffered_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ pub struct Client {

impl Client {
/// Connect via TCP to this log server.
pub fn new(addr: SocketAddr) -> Self {
///
/// `flush_timeout` is the minimum time the `TcpClient` will wait during a
/// flush before potentially dropping data. Note: Passing `None` here can
/// cause a call to `flush` to block indefinitely if a connection cannot be
/// established.
pub fn new(addr: SocketAddr, flush_timeout: Option<std::time::Duration>) -> Self {
re_log::debug!("Connecting to remote {addr}…");

// TODO(emilk): keep track of how much memory is in each pipe
Expand Down Expand Up @@ -88,7 +93,7 @@ impl Client {
let send_join = std::thread::Builder::new()
.name("tcp_sender".into())
.spawn(move || {
tcp_sender(addr, &packet_rx, &send_quit_rx, &flushed_tx);
tcp_sender(addr, flush_timeout, &packet_rx, &send_quit_rx, &flushed_tx);
})
.expect("Failed to spawn thread");

Expand Down Expand Up @@ -250,11 +255,12 @@ fn msg_encode(

fn tcp_sender(
addr: SocketAddr,
flush_timeout: Option<std::time::Duration>,
packet_rx: &Receiver<PacketMsg>,
quit_rx: &Receiver<InterruptMsg>,
flushed_tx: &Sender<FlushedMsg>,
) {
let mut tcp_client = crate::tcp_client::TcpClient::new(addr);
let mut tcp_client = crate::tcp_client::TcpClient::new(addr, flush_timeout);
// Once this flag has been set, we will drop all messages if the tcp_client is
// no longer connected.
let mut drop_if_disconnected = false;
Expand Down Expand Up @@ -311,36 +317,47 @@ fn send_until_success(
quit_rx: &Receiver<InterruptMsg>,
) -> Option<InterruptMsg> {
// Early exit if tcp_client is disconnected
if drop_if_disconnected && tcp_client.has_disconnected() {
re_log::debug_once!("Dropping messages because we're disconnected.");
if drop_if_disconnected && tcp_client.has_timed_out_for_flush() {
re_log::warn_once!("Dropping messages because tcp client has timed out.");
return None;
}

if let Err(err) = tcp_client.send(packet) {
if drop_if_disconnected {
re_log::debug_once!("Dropping messages because we're disconnected.");
if drop_if_disconnected && tcp_client.has_timed_out_for_flush() {
re_log::warn_once!("Dropping messages because tcp client has timed out.");
return None;
}
// If this is the first time we fail to send the message, produce a warning.
re_log::warn!("Failed to send message: {err}");
re_log::debug!("Failed to send message: {err}");

let mut attempts = 1;
let mut sleep_ms = 100;

loop {
select! {
recv(quit_rx) -> _quit_msg => {
re_log::debug_once!("Dropping messages because we're disconnected or quitting.");
re_log::warn_once!("Dropping messages because tcp client has timed out or quitting.");
return Some(_quit_msg.unwrap_or(InterruptMsg::Quit));
}
default(std::time::Duration::from_millis(sleep_ms)) => {
if let Err(new_err) = tcp_client.send(packet) {
attempts += 1;
if attempts == 3 {
re_log::warn!("Failed to send message after {attempts} attempts: {err}");
}

if drop_if_disconnected && tcp_client.has_timed_out_for_flush() {
re_log::warn_once!("Dropping messages because tcp client has timed out.");
return None;
}

const MAX_SLEEP_MS : u64 = 3000;

sleep_ms = (sleep_ms * 2).min(MAX_SLEEP_MS);

// Only produce subsequent warnings once we've saturated the back-off
if sleep_ms == MAX_SLEEP_MS && new_err.to_string() != err.to_string() {
re_log::warn!("Still failing to send message: {err}");
re_log::warn!("Still failing to send message after {attempts} attempts: {err}");
}
} else {
return None;
Expand Down
6 changes: 6 additions & 0 deletions crates/re_sdk_comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,9 @@ pub const DEFAULT_SERVER_PORT: u16 = 9876;
pub fn default_server_addr() -> std::net::SocketAddr {
std::net::SocketAddr::from(([127, 0, 0, 1], DEFAULT_SERVER_PORT))
}

/// The default amount of time to wait for the TCP connection to resume during a flush
#[allow(clippy::unnecessary_wraps)]
pub fn default_flush_timeout() -> Option<std::time::Duration> {
Some(std::time::Duration::from_secs(2))
}
141 changes: 85 additions & 56 deletions crates/re_sdk_comms/src/tcp_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
io::Write,
net::{SocketAddr, TcpStream},
time::{Duration, Instant},
};

#[derive(thiserror::Error, Debug)]
Expand All @@ -21,36 +22,41 @@ pub enum ClientError {
/// State of the [`TcpStream`]
///
/// Because the [`TcpClient`] lazily connects on [`TcpClient::send`], it needs a
/// very simple state machine to track the state of the connection. A trinary
/// state is used to specifically differentiate between
/// [`TcpStreamState::Pending`] which is still a nominal state for any new tcp
/// connection, and [`TcpStreamState::Disconnected`] which implies either a
/// failure to connect, or an error on an already established stream.
/// very simple state machine to track the state of the connection.
/// [`TcpStreamState::Pending`] is the nominal state for any new TCP connection
/// when we successfully connect, we transition to [`TcpStreamState::Connected`].
enum TcpStreamState {
/// The [`TcpStream`] is yet to be connected.
///
/// Tracks the duration and connection attempts since the last time the client
/// was `Connected.`
///
/// Behavior: Try to connect on next [`TcpClient::connect`] or [`TcpClient::send`].
///
/// Transitions:
/// - Pending -> Connected on successful connection.
/// - Pending -> Disconnected on failed connection.
Pending,
/// - Pending -> Pending on failed connection.
Pending {
start_time: Instant,
num_attempts: usize,
},

/// A healthy [`TcpStream`] ready to send packets
///
/// Behavior: Send packets on [`TcpClient::send`]
///
/// Transitions:
/// - Connected -> Disconnected on send error
/// - Connected -> Pending on send error
Connected(TcpStream),
}

/// A broken [`TcpStream`] which experienced a failure to connect or send.
///
/// Behavior: Try to re-connect on next [`TcpClient::connect`] or [`TcpClient::send`]
///
/// Transitions:
/// - Disconnected -> Connected on successful connection.
Disconnected,
impl TcpStreamState {
fn reset() -> Self {
Self::Pending {
start_time: Instant::now(),
num_attempts: 0,
}
}
}

/// Connect to a rerun server and send log messages.
Expand All @@ -59,52 +65,58 @@ enum TcpStreamState {
pub struct TcpClient {
addr: SocketAddr,
stream_state: TcpStreamState,
}

impl Default for TcpClient {
fn default() -> Self {
Self::new(crate::default_server_addr())
}
flush_timeout: Option<Duration>,
}

impl TcpClient {
pub fn new(addr: SocketAddr) -> Self {
pub fn new(addr: SocketAddr, flush_timeout: Option<Duration>) -> Self {
Self {
addr,
stream_state: TcpStreamState::Pending,
stream_state: TcpStreamState::reset(),
flush_timeout,
}
}

/// Returns `false` on failure. Does nothing if already connected.
///
/// [`Self::send`] will call this.
pub fn connect(&mut self) -> Result<(), ClientError> {
if let TcpStreamState::Connected(_) = self.stream_state {
Ok(())
} else {
re_log::debug!("Connecting to {:?}…", self.addr);
let timeout = std::time::Duration::from_secs(5);
match TcpStream::connect_timeout(&self.addr, timeout) {
Ok(mut stream) => {
re_log::debug!("Connected to {:?}.", self.addr);
if let Err(err) = stream.write(&crate::PROTOCOL_VERSION.to_le_bytes()) {
self.stream_state = TcpStreamState::Disconnected;
Err(ClientError::Send {
match self.stream_state {
TcpStreamState::Connected(_) => Ok(()),
TcpStreamState::Pending {
start_time,
num_attempts,
} => {
re_log::debug!("Connecting to {:?}…", self.addr);
let timeout = std::time::Duration::from_secs(5);
match TcpStream::connect_timeout(&self.addr, timeout) {
Ok(mut stream) => {
re_log::debug!("Connected to {:?}.", self.addr);
if let Err(err) = stream.write(&crate::PROTOCOL_VERSION.to_le_bytes()) {
self.stream_state = TcpStreamState::Pending {
start_time,
num_attempts: num_attempts + 1,
};
Err(ClientError::Send {
addr: self.addr,
err,
})
} else {
self.stream_state = TcpStreamState::Connected(stream);
Ok(())
}
}
Err(err) => {
self.stream_state = TcpStreamState::Pending {
start_time,
num_attempts: num_attempts + 1,
};
Err(ClientError::Connect {
addr: self.addr,
err,
})
} else {
self.stream_state = TcpStreamState::Connected(stream);
Ok(())
}
}
Err(err) => {
self.stream_state = TcpStreamState::Disconnected;
Err(ClientError::Connect {
addr: self.addr,
err,
})
}
}
}
}
Expand All @@ -118,15 +130,15 @@ impl TcpClient {
if let TcpStreamState::Connected(stream) = &mut self.stream_state {
re_log::trace!("Sending a packet of size {}…", packet.len());
if let Err(err) = stream.write(&(packet.len() as u32).to_le_bytes()) {
self.stream_state = TcpStreamState::Disconnected;
self.stream_state = TcpStreamState::reset();
return Err(ClientError::Send {
addr: self.addr,
err,
});
}

if let Err(err) = stream.write(packet) {
self.stream_state = TcpStreamState::Disconnected;
self.stream_state = TcpStreamState::reset();
return Err(ClientError::Send {
addr: self.addr,
err,
Expand All @@ -141,23 +153,40 @@ impl TcpClient {

/// Wait until all logged data have been sent.
pub fn flush(&mut self) {
re_log::trace!("Flushing TCP stream…");
if let TcpStreamState::Connected(stream) = &mut self.stream_state {
if let Err(err) = stream.flush() {
re_log::warn!("Failed to flush: {err}");
self.stream_state = TcpStreamState::Disconnected;
re_log::trace!("Attempting to flush TCP stream…");
match &mut self.stream_state {
TcpStreamState::Pending { .. } => {
re_log::warn!(
"Tried to flush while TCP stream was still Pending. Data was possibly dropped."
);
}
TcpStreamState::Connected(stream) => {
if let Err(err) = stream.flush() {
re_log::warn!("Failed to flush TCP stream: {err}");
self.stream_state = TcpStreamState::reset();
} else {
re_log::trace!("TCP stream flushed.");
}
}
}
re_log::trace!("TCP stream flushed.");
}

/// Check if the underlying [`TcpStream`] has entered the [`TcpStreamState::Disconnected`] state
/// Check if the underlying [`TcpStream`] is in the [`TcpStreamState::Pending`] state
/// and has reached the flush timeout threshold.
///
/// Note that this only occurs after a failure to connect or a failure to send.
pub fn has_disconnected(&self) -> bool {
pub fn has_timed_out_for_flush(&self) -> bool {
match self.stream_state {
TcpStreamState::Pending | TcpStreamState::Connected(_) => false,
TcpStreamState::Disconnected => true,
TcpStreamState::Pending {
start_time,
num_attempts,
} => {
// If a timeout wasn't provided, never timeout
self.flush_timeout.map_or(false, |timeout| {
Instant::now().duration_since(start_time) > timeout && num_attempts > 0
})
}
TcpStreamState::Connected(_) => false,
}
}
}
Loading

0 comments on commit 5b64e66

Please sign in to comment.