diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index 087999d24c..834710b55f 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -53,8 +53,11 @@ pub use error::{Error, use futures::sync::mpsc::UnboundedSender; use habitat_common::types::{AutomateAuthToken, EventStreamMetadata}; +use habitat_core::env::Config as EnvConfig; use state::Container; use std::{net::SocketAddr, + num::ParseIntError, + str::FromStr, sync::Once, time::Duration}; @@ -254,3 +257,28 @@ impl EventStream { } } } + +//////////////////////////////////////////////////////////////////////// + +/// How long should we for the event thread to start up before +/// abandoning it and shutting down? +#[derive(Clone, Debug)] +struct EventThreadStartupWait(u64); + +impl Default for EventThreadStartupWait { + fn default() -> Self { Self(5) } +} + +impl FromStr for EventThreadStartupWait { + type Err = ParseIntError; + + fn from_str(s: &str) -> ::std::result::Result { Ok(Self(s.parse()?)) } +} + +impl EnvConfig for EventThreadStartupWait { + const ENVVAR: &'static str = "HAB_EVENT_THREAD_STARTUP_WAIT_SEC"; +} + +impl Into for EventThreadStartupWait { + fn into(self) -> Duration { Duration::from_secs(self.0) } +} diff --git a/components/sup/src/event/nitox.rs b/components/sup/src/event/nitox.rs index cb3d1b537d..f0e0db5ea8 100644 --- a/components/sup/src/event/nitox.rs +++ b/components/sup/src/event/nitox.rs @@ -1,3 +1,4 @@ +use super::EventThreadStartupWait; use crate::event::{Error, EventConnectionInfo, EventStream, @@ -5,13 +6,13 @@ use crate::event::{Error, use futures::{sync::mpsc as futures_mpsc, Future, Stream}; +use habitat_core::env::Config as _; use nitox::{commands::ConnectCommand, streaming::client::NatsStreamingClient, NatsClient, NatsClientOptions}; use std::{sync::mpsc as std_mpsc, - thread, - time::Duration}; + thread}; use tokio::{executor, runtime::current_thread::Runtime as ThreadRuntime}; /// All messages are published under this subject. @@ -83,7 +84,7 @@ pub(super) fn init_stream(conn_info: EventConnectionInfo) -> Result }) .map_err(Error::SpawnEventThreadError)?; - sync_rx.recv_timeout(Duration::from_secs(5)) + sync_rx.recv_timeout(EventThreadStartupWait::configured_value().into()) .map_err(Error::ConnectEventServerError)?; Ok(EventStream(event_tx)) } diff --git a/components/sup/src/event/ratsio.rs b/components/sup/src/event/ratsio.rs index 120c8cc360..eb4dd1b6b1 100644 --- a/components/sup/src/event/ratsio.rs +++ b/components/sup/src/event/ratsio.rs @@ -1,3 +1,4 @@ +use super::EventThreadStartupWait; use crate::event::{Error, EventConnectionInfo, EventStream, @@ -5,13 +6,13 @@ use crate::event::{Error, use futures::{sync::mpsc as futures_mpsc, Future, Stream}; +use habitat_core::env::Config as _; use ratsio::{nats_client::NatsClientOptions, stan_client::{StanClient, StanMessage, StanOptions}}; use std::{sync::mpsc as std_mpsc, - thread, - time::Duration}; + thread}; use tokio::{executor, runtime::current_thread::Runtime as ThreadRuntime}; @@ -78,7 +79,7 @@ pub(super) fn init_stream(conn_info: EventConnectionInfo) -> Result }) .map_err(Error::SpawnEventThreadError)?; // TODO (CM): ratsio error variant - sync_rx.recv_timeout(Duration::from_secs(5)) + sync_rx.recv_timeout(EventThreadStartupWait::configured_value().into()) .map_err(Error::ConnectEventServerError)?; Ok(EventStream(event_tx)) }