From 5b64e6678a5f8cc227b1bcb552fdd87c252ea475 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 26 Jul 2023 12:01:58 -0400 Subject: [PATCH] Fix shutdown race-condition by introducing a flush_timeout before dropping data (#2821) Resolves: https://github.com/rerun-io/rerun/issues/2556 ### What We've always had a race condition during startup where a viewer might not exist despite a client trying to connect to it. For users of spawn we could have added a bandaid here by probably increasing this sleep: https://github.com/rerun-io/rerun/blob/main/rerun_py/rerun_sdk/rerun/sinks.py#L180 This wouldn't resolve issues for other use-cases of manually launching a viewer and client without tight orchestration. Instead, we introduce a timeout (which can optionally be set to None), to use during the disconnected checks when we are flushing. This gives power-users the option to specify None and reduce risk of losing data (at an increased risk of blocking during flush when things go wrong). This PR also bumps a couple of debug logs up to warnings to make it more clear when data is being dropped. ### Checklist * [x] I have read and agree to [Contributor Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and the [Code of Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md) * [x] I've included a screenshot or gif (if applicable) * [x] I have tested [demo.rerun.io](https://demo.rerun.io/pr/2821) (if applicable) - [PR Build Summary](https://build.rerun.io/pr/2821) - [Docs preview](https://rerun.io/preview/pr%3Ajleibs%2Ftimeout_disconnects/docs) - [Examples preview](https://rerun.io/preview/pr%3Ajleibs%2Ftimeout_disconnects/examples) --- crates/re_sdk/src/lib.rs | 2 +- crates/re_sdk/src/log_sink.rs | 8 +- crates/re_sdk/src/recording_stream.rs | 22 +++- crates/re_sdk_comms/src/buffered_client.rs | 37 ++++-- crates/re_sdk_comms/src/lib.rs | 6 + crates/re_sdk_comms/src/tcp_client.rs | 141 +++++++++++++-------- crates/rerun/src/clap.rs | 5 +- crates/rerun/src/lib.rs | 3 +- crates/rerun_c/src/lib.rs | 2 +- rerun_py/rerun_sdk/rerun/sinks.py | 10 +- rerun_py/src/python_bridge.rs | 16 ++- 11 files changed, 168 insertions(+), 84 deletions(-) diff --git a/crates/re_sdk/src/lib.rs b/crates/re_sdk/src/lib.rs index f75262eb8f77..6f4d098a3496 100644 --- a/crates/re_sdk/src/lib.rs +++ b/crates/re_sdk/src/lib.rs @@ -20,7 +20,7 @@ mod recording_stream; pub use self::msg_sender::{MsgSender, MsgSenderError}; pub use self::recording_stream::{RecordingStream, RecordingStreamBuilder}; -pub use re_sdk_comms::default_server_addr; +pub use re_sdk_comms::{default_flush_timeout, default_server_addr}; pub use re_log_types::{ApplicationId, EntityPath, LegacyComponent, StoreId, StoreKind}; pub use re_types::ComponentName; diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index 1d5a619e7e66..88ae0e796727 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -192,10 +192,14 @@ pub struct TcpSink { impl TcpSink { /// Connect to the given address in a background thread. /// Retries until successful. + /// + /// `flush_timeout` is the minimum time the [`TcpSink`] will wait during a flush + /// before potentially dropping data. Note: Passing `None` here can cause a + /// call to `flush` to block indefinitely if a connection cannot be established. #[inline] - pub fn new(addr: std::net::SocketAddr) -> Self { + pub fn new(addr: std::net::SocketAddr, flush_timeout: Option) -> Self { Self { - client: re_sdk_comms::Client::new(addr), + client: re_sdk_comms::Client::new(addr, flush_timeout), } } } diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index e5f38c27e5b3..d699022c776c 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -219,20 +219,28 @@ impl RecordingStreamBuilder { /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a /// remote Rerun instance. /// + /// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will + /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a + /// call to `flush` to block indefinitely if a connection cannot be established. + /// /// ## Example /// /// ```no_run /// let rec_stream = re_sdk::RecordingStreamBuilder::new("my_app") - /// .connect(re_sdk::default_server_addr())?; + /// .connect(re_sdk::default_server_addr(), re_sdk::default_flush_timeout())?; /// # Ok::<(), Box>(()) /// ``` - pub fn connect(self, addr: std::net::SocketAddr) -> RecordingStreamResult { + pub fn connect( + self, + addr: std::net::SocketAddr, + flush_timeout: Option, + ) -> RecordingStreamResult { let (enabled, store_info, batcher_config) = self.into_args(); if enabled { RecordingStream::new( store_info, batcher_config, - Box::new(crate::log_sink::TcpSink::new(addr)), + Box::new(crate::log_sink::TcpSink::new(addr, flush_timeout)), ) } else { re_log::debug!("Rerun disabled - call to connect() ignored"); @@ -850,11 +858,15 @@ impl RecordingStream { /// Swaps the underlying sink for a [`crate::log_sink::TcpSink`] sink pre-configured to use /// the specified address. /// + /// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will + /// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a + /// call to `flush` to block indefinitely if a connection cannot be established. + /// /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in /// terms of data durability and ordering. /// See [`Self::set_sink`] for more information. - pub fn connect(&self, addr: std::net::SocketAddr) { - self.set_sink(Box::new(crate::log_sink::TcpSink::new(addr))); + pub fn connect(&self, addr: std::net::SocketAddr, flush_timeout: Option) { + self.set_sink(Box::new(crate::log_sink::TcpSink::new(addr, flush_timeout))); } /// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated diff --git a/crates/re_sdk_comms/src/buffered_client.rs b/crates/re_sdk_comms/src/buffered_client.rs index 5647c0c59e06..0778931be75b 100644 --- a/crates/re_sdk_comms/src/buffered_client.rs +++ b/crates/re_sdk_comms/src/buffered_client.rs @@ -54,7 +54,12 @@ pub struct Client { impl Client { /// Connect via TCP to this log server. - pub fn new(addr: SocketAddr) -> Self { + /// + /// `flush_timeout` is the minimum time the `TcpClient` will wait during a + /// flush before potentially dropping data. Note: Passing `None` here can + /// cause a call to `flush` to block indefinitely if a connection cannot be + /// established. + pub fn new(addr: SocketAddr, flush_timeout: Option) -> Self { re_log::debug!("Connecting to remote {addr}…"); // TODO(emilk): keep track of how much memory is in each pipe @@ -88,7 +93,7 @@ impl Client { let send_join = std::thread::Builder::new() .name("tcp_sender".into()) .spawn(move || { - tcp_sender(addr, &packet_rx, &send_quit_rx, &flushed_tx); + tcp_sender(addr, flush_timeout, &packet_rx, &send_quit_rx, &flushed_tx); }) .expect("Failed to spawn thread"); @@ -250,11 +255,12 @@ fn msg_encode( fn tcp_sender( addr: SocketAddr, + flush_timeout: Option, packet_rx: &Receiver, quit_rx: &Receiver, flushed_tx: &Sender, ) { - let mut tcp_client = crate::tcp_client::TcpClient::new(addr); + let mut tcp_client = crate::tcp_client::TcpClient::new(addr, flush_timeout); // Once this flag has been set, we will drop all messages if the tcp_client is // no longer connected. let mut drop_if_disconnected = false; @@ -311,36 +317,47 @@ fn send_until_success( quit_rx: &Receiver, ) -> Option { // Early exit if tcp_client is disconnected - if drop_if_disconnected && tcp_client.has_disconnected() { - re_log::debug_once!("Dropping messages because we're disconnected."); + if drop_if_disconnected && tcp_client.has_timed_out_for_flush() { + re_log::warn_once!("Dropping messages because tcp client has timed out."); return None; } if let Err(err) = tcp_client.send(packet) { - if drop_if_disconnected { - re_log::debug_once!("Dropping messages because we're disconnected."); + if drop_if_disconnected && tcp_client.has_timed_out_for_flush() { + re_log::warn_once!("Dropping messages because tcp client has timed out."); return None; } // If this is the first time we fail to send the message, produce a warning. - re_log::warn!("Failed to send message: {err}"); + re_log::debug!("Failed to send message: {err}"); + let mut attempts = 1; let mut sleep_ms = 100; loop { select! { recv(quit_rx) -> _quit_msg => { - re_log::debug_once!("Dropping messages because we're disconnected or quitting."); + re_log::warn_once!("Dropping messages because tcp client has timed out or quitting."); return Some(_quit_msg.unwrap_or(InterruptMsg::Quit)); } default(std::time::Duration::from_millis(sleep_ms)) => { if let Err(new_err) = tcp_client.send(packet) { + attempts += 1; + if attempts == 3 { + re_log::warn!("Failed to send message after {attempts} attempts: {err}"); + } + + if drop_if_disconnected && tcp_client.has_timed_out_for_flush() { + re_log::warn_once!("Dropping messages because tcp client has timed out."); + return None; + } + const MAX_SLEEP_MS : u64 = 3000; sleep_ms = (sleep_ms * 2).min(MAX_SLEEP_MS); // Only produce subsequent warnings once we've saturated the back-off if sleep_ms == MAX_SLEEP_MS && new_err.to_string() != err.to_string() { - re_log::warn!("Still failing to send message: {err}"); + re_log::warn!("Still failing to send message after {attempts} attempts: {err}"); } } else { return None; diff --git a/crates/re_sdk_comms/src/lib.rs b/crates/re_sdk_comms/src/lib.rs index 6c9e2044292d..5e397e0876a5 100644 --- a/crates/re_sdk_comms/src/lib.rs +++ b/crates/re_sdk_comms/src/lib.rs @@ -27,3 +27,9 @@ pub const DEFAULT_SERVER_PORT: u16 = 9876; pub fn default_server_addr() -> std::net::SocketAddr { std::net::SocketAddr::from(([127, 0, 0, 1], DEFAULT_SERVER_PORT)) } + +/// The default amount of time to wait for the TCP connection to resume during a flush +#[allow(clippy::unnecessary_wraps)] +pub fn default_flush_timeout() -> Option { + Some(std::time::Duration::from_secs(2)) +} diff --git a/crates/re_sdk_comms/src/tcp_client.rs b/crates/re_sdk_comms/src/tcp_client.rs index 083ff86d8d17..ead2259f19a2 100644 --- a/crates/re_sdk_comms/src/tcp_client.rs +++ b/crates/re_sdk_comms/src/tcp_client.rs @@ -1,6 +1,7 @@ use std::{ io::Write, net::{SocketAddr, TcpStream}, + time::{Duration, Instant}, }; #[derive(thiserror::Error, Debug)] @@ -21,36 +22,41 @@ pub enum ClientError { /// State of the [`TcpStream`] /// /// Because the [`TcpClient`] lazily connects on [`TcpClient::send`], it needs a -/// very simple state machine to track the state of the connection. A trinary -/// state is used to specifically differentiate between -/// [`TcpStreamState::Pending`] which is still a nominal state for any new tcp -/// connection, and [`TcpStreamState::Disconnected`] which implies either a -/// failure to connect, or an error on an already established stream. +/// very simple state machine to track the state of the connection. +/// [`TcpStreamState::Pending`] is the nominal state for any new TCP connection +/// when we successfully connect, we transition to [`TcpStreamState::Connected`]. enum TcpStreamState { /// The [`TcpStream`] is yet to be connected. /// + /// Tracks the duration and connection attempts since the last time the client + /// was `Connected.` + /// /// Behavior: Try to connect on next [`TcpClient::connect`] or [`TcpClient::send`]. /// /// Transitions: /// - Pending -> Connected on successful connection. - /// - Pending -> Disconnected on failed connection. - Pending, + /// - Pending -> Pending on failed connection. + Pending { + start_time: Instant, + num_attempts: usize, + }, /// A healthy [`TcpStream`] ready to send packets /// /// Behavior: Send packets on [`TcpClient::send`] /// /// Transitions: - /// - Connected -> Disconnected on send error + /// - Connected -> Pending on send error Connected(TcpStream), +} - /// A broken [`TcpStream`] which experienced a failure to connect or send. - /// - /// Behavior: Try to re-connect on next [`TcpClient::connect`] or [`TcpClient::send`] - /// - /// Transitions: - /// - Disconnected -> Connected on successful connection. - Disconnected, +impl TcpStreamState { + fn reset() -> Self { + Self::Pending { + start_time: Instant::now(), + num_attempts: 0, + } + } } /// Connect to a rerun server and send log messages. @@ -59,19 +65,15 @@ enum TcpStreamState { pub struct TcpClient { addr: SocketAddr, stream_state: TcpStreamState, -} - -impl Default for TcpClient { - fn default() -> Self { - Self::new(crate::default_server_addr()) - } + flush_timeout: Option, } impl TcpClient { - pub fn new(addr: SocketAddr) -> Self { + pub fn new(addr: SocketAddr, flush_timeout: Option) -> Self { Self { addr, - stream_state: TcpStreamState::Pending, + stream_state: TcpStreamState::reset(), + flush_timeout, } } @@ -79,32 +81,42 @@ impl TcpClient { /// /// [`Self::send`] will call this. pub fn connect(&mut self) -> Result<(), ClientError> { - if let TcpStreamState::Connected(_) = self.stream_state { - Ok(()) - } else { - re_log::debug!("Connecting to {:?}…", self.addr); - let timeout = std::time::Duration::from_secs(5); - match TcpStream::connect_timeout(&self.addr, timeout) { - Ok(mut stream) => { - re_log::debug!("Connected to {:?}.", self.addr); - if let Err(err) = stream.write(&crate::PROTOCOL_VERSION.to_le_bytes()) { - self.stream_state = TcpStreamState::Disconnected; - Err(ClientError::Send { + match self.stream_state { + TcpStreamState::Connected(_) => Ok(()), + TcpStreamState::Pending { + start_time, + num_attempts, + } => { + re_log::debug!("Connecting to {:?}…", self.addr); + let timeout = std::time::Duration::from_secs(5); + match TcpStream::connect_timeout(&self.addr, timeout) { + Ok(mut stream) => { + re_log::debug!("Connected to {:?}.", self.addr); + if let Err(err) = stream.write(&crate::PROTOCOL_VERSION.to_le_bytes()) { + self.stream_state = TcpStreamState::Pending { + start_time, + num_attempts: num_attempts + 1, + }; + Err(ClientError::Send { + addr: self.addr, + err, + }) + } else { + self.stream_state = TcpStreamState::Connected(stream); + Ok(()) + } + } + Err(err) => { + self.stream_state = TcpStreamState::Pending { + start_time, + num_attempts: num_attempts + 1, + }; + Err(ClientError::Connect { addr: self.addr, err, }) - } else { - self.stream_state = TcpStreamState::Connected(stream); - Ok(()) } } - Err(err) => { - self.stream_state = TcpStreamState::Disconnected; - Err(ClientError::Connect { - addr: self.addr, - err, - }) - } } } } @@ -118,7 +130,7 @@ impl TcpClient { if let TcpStreamState::Connected(stream) = &mut self.stream_state { re_log::trace!("Sending a packet of size {}…", packet.len()); if let Err(err) = stream.write(&(packet.len() as u32).to_le_bytes()) { - self.stream_state = TcpStreamState::Disconnected; + self.stream_state = TcpStreamState::reset(); return Err(ClientError::Send { addr: self.addr, err, @@ -126,7 +138,7 @@ impl TcpClient { } if let Err(err) = stream.write(packet) { - self.stream_state = TcpStreamState::Disconnected; + self.stream_state = TcpStreamState::reset(); return Err(ClientError::Send { addr: self.addr, err, @@ -141,23 +153,40 @@ impl TcpClient { /// Wait until all logged data have been sent. pub fn flush(&mut self) { - re_log::trace!("Flushing TCP stream…"); - if let TcpStreamState::Connected(stream) = &mut self.stream_state { - if let Err(err) = stream.flush() { - re_log::warn!("Failed to flush: {err}"); - self.stream_state = TcpStreamState::Disconnected; + re_log::trace!("Attempting to flush TCP stream…"); + match &mut self.stream_state { + TcpStreamState::Pending { .. } => { + re_log::warn!( + "Tried to flush while TCP stream was still Pending. Data was possibly dropped." + ); + } + TcpStreamState::Connected(stream) => { + if let Err(err) = stream.flush() { + re_log::warn!("Failed to flush TCP stream: {err}"); + self.stream_state = TcpStreamState::reset(); + } else { + re_log::trace!("TCP stream flushed."); + } } } - re_log::trace!("TCP stream flushed."); } - /// Check if the underlying [`TcpStream`] has entered the [`TcpStreamState::Disconnected`] state + /// Check if the underlying [`TcpStream`] is in the [`TcpStreamState::Pending`] state + /// and has reached the flush timeout threshold. /// /// Note that this only occurs after a failure to connect or a failure to send. - pub fn has_disconnected(&self) -> bool { + pub fn has_timed_out_for_flush(&self) -> bool { match self.stream_state { - TcpStreamState::Pending | TcpStreamState::Connected(_) => false, - TcpStreamState::Disconnected => true, + TcpStreamState::Pending { + start_time, + num_attempts, + } => { + // If a timeout wasn't provided, never timeout + self.flush_timeout.map_or(false, |timeout| { + Instant::now().duration_since(start_time) > timeout && num_attempts > 0 + }) + } + TcpStreamState::Connected(_) => false, } } } diff --git a/crates/rerun/src/clap.rs b/crates/rerun/src/clap.rs index 59f75093642f..94c046aa497c 100644 --- a/crates/rerun/src/clap.rs +++ b/crates/rerun/src/clap.rs @@ -102,7 +102,10 @@ impl RerunArgs { } let sink: Box = match self.to_behavior()? { - RerunBehavior::Connect(addr) => Box::new(crate::sink::TcpSink::new(addr)), + RerunBehavior::Connect(addr) => Box::new(crate::sink::TcpSink::new( + addr, + crate::default_flush_timeout(), + )), RerunBehavior::Save(path) => Box::new(crate::sink::FileSink::new(path)?), diff --git a/crates/rerun/src/lib.rs b/crates/rerun/src/lib.rs index 050c47634f72..e02b0d672bcc 100644 --- a/crates/rerun/src/lib.rs +++ b/crates/rerun/src/lib.rs @@ -61,7 +61,8 @@ //! Then do this: //! //! ```no_run -//! let rec_stream = rerun::RecordingStreamBuilder::new("my_app").connect(rerun::default_server_addr()); +//! let rec_stream = rerun::RecordingStreamBuilder::new("my_app") +//! .connect(rerun::default_server_addr(), rerun::default_flush_timeout()); //! ``` //! //! #### Buffering diff --git a/crates/rerun_c/src/lib.rs b/crates/rerun_c/src/lib.rs index 22a7cf04d1ec..9644e44a56b0 100644 --- a/crates/rerun_c/src/lib.rs +++ b/crates/rerun_c/src/lib.rs @@ -141,7 +141,7 @@ pub unsafe extern "C" fn rr_recording_stream_new( .expect("invalid tcp_addr string") .parse() .expect("invalid tcp_addr"); - let sink = Box::new(TcpSink::new(tcp_addr)); + let sink = Box::new(TcpSink::new(tcp_addr, re_sdk::default_flush_timeout())); let rec_stream = RecordingStream::new(store_info, batcher_config, sink).unwrap(); diff --git a/rerun_py/rerun_sdk/rerun/sinks.py b/rerun_py/rerun_sdk/rerun/sinks.py index 6cf13c96b537..87a1f21a5bfc 100644 --- a/rerun_py/rerun_sdk/rerun/sinks.py +++ b/rerun_py/rerun_sdk/rerun/sinks.py @@ -10,7 +10,9 @@ # --- Sinks --- -def connect(addr: str | None = None, recording: RecordingStream | None = None) -> None: +def connect( + addr: str | None = None, flush_timeout_sec: float | None = 2.0, recording: RecordingStream | None = None +) -> None: """ Connect to a remote Rerun Viewer on the given ip:port. @@ -22,6 +24,10 @@ def connect(addr: str | None = None, recording: RecordingStream | None = None) - ---------- addr The ip:port to connect to + flush_timeout_sec: float + The minimum time the SDK will wait during a flush before potentially + dropping data if progress is not being made. Passing `None` indicates no timeout, + and can cause a call to `flush` to block indefinitely. recording: Specifies the [`rerun.RecordingStream`][] to use. If left unspecified, defaults to the current active data recording, if there is one. @@ -29,7 +35,7 @@ def connect(addr: str | None = None, recording: RecordingStream | None = None) - """ recording = RecordingStream.to_native(recording) - bindings.connect(addr=addr, recording=recording) + bindings.connect(addr=addr, flush_timeout_sec=flush_timeout_sec, recording=recording) _connect = connect # we need this because Python scoping is horrible diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index f5eb2cba6889..67b6d7753752 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -489,24 +489,30 @@ fn is_enabled(recording: Option<&PyRecordingStream>) -> bool { } #[pyfunction] -#[pyo3(signature = (addr = None, recording = None))] -fn connect(addr: Option, recording: Option<&PyRecordingStream>) -> PyResult<()> { +#[pyo3(signature = (addr = None, flush_timeout_sec=rerun::default_flush_timeout().unwrap().as_secs_f32(), recording = None))] +fn connect( + addr: Option, + flush_timeout_sec: Option, + recording: Option<&PyRecordingStream>, +) -> PyResult<()> { let addr = if let Some(addr) = addr { addr.parse()? } else { rerun::default_server_addr() }; + let flush_timeout = flush_timeout_sec.map(std::time::Duration::from_secs_f32); + if let Some(recording) = recording { // If the user passed in a recording, use it - recording.connect(addr); + recording.connect(addr, flush_timeout); } else { // Otherwise, connect both global defaults if let Some(recording) = get_data_recording(None) { - recording.connect(addr); + recording.connect(addr, flush_timeout); }; if let Some(blueprint) = get_blueprint_recording(None) { - blueprint.connect(addr); + blueprint.connect(addr, flush_timeout); }; }