diff --git a/Cargo.lock b/Cargo.lock index f6dd459496a..c5c08d9ee03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,3 +1,5 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. [[package]] name = "MacTypes-sys" version = "2.1.0" diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index 082b9e51533..a689e9073cb 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -32,6 +32,7 @@ use self::types::{EventMessage, HealthCheckEvent, ServiceStartedEvent, ServiceStoppedEvent}; +use crate::AutomateAuthToken; use crate::{error::Result, manager::{service::{HealthCheck, Service}, @@ -73,10 +74,11 @@ lazy_static! { /// static reference for access later. pub fn init_stream(conn_info: EventConnectionInfo, event_core: EventCore) { INIT.call_once(|| { - let event_stream = init_nats_stream(conn_info).expect("Could not start NATS thread"); - EVENT_STREAM.set(event_stream); - EVENT_CORE.set(event_core); - }); + println!("automate auth token is {}", conn_info.auth_token.as_str()); + let event_stream = init_nats_stream(conn_info).expect("Could not start NATS thread"); + EVENT_STREAM.set(event_stream); + EVENT_CORE.set(event_core); + }); } /// Captures all event stream-related configuration options that would @@ -106,10 +108,11 @@ impl EventStreamConfig { // TODO: This will change as we firm up what the interaction between // Habitat and A2 looks like. pub struct EventConnectionInfo { - pub name: String, - pub verbose: bool, + pub name: String, + pub verbose: bool, pub cluster_uri: String, - pub cluster_id: String, + pub cluster_id: String, + pub auth_token: AutomateAuthToken, } /// A collection of data that will be present in all events. Rather @@ -232,10 +235,14 @@ const HABITAT_SUBJECT: &str = "habitat"; // prototyping, though. impl Default for EventConnectionInfo { fn default() -> Self { - EventConnectionInfo { name: String::from("habitat"), - verbose: true, - cluster_uri: String::from("127.0.0.1:4223"), - cluster_id: String::from("test-cluster"), } + EventConnectionInfo { + name: String::from("habitat"), + verbose: true, + cluster_uri: String::from("127.0.0.1:4223"), + cluster_id: String::from("test-cluster"), + // DON'T LEAVE THIS ADMIN TOKEN IN HERE! + auth_token: AutomateAuthToken("D6fHxsfc_FlGG4coaZXdNv-vSUM=".to_string()), + } } } @@ -248,28 +255,38 @@ fn init_nats_stream(conn_info: EventConnectionInfo) -> Result { // it in the Supervisor's Tokio runtime, but there's currently a // bug: https://github.com/YellowInnovation/nitox/issues/24 - thread::Builder::new().name("events".to_string()) - .spawn(move || { - let EventConnectionInfo { name, - verbose, - cluster_uri, - cluster_id, } = conn_info; + thread::Builder::new() + .name("events".to_string()) + .spawn(move || { + let EventConnectionInfo { + name, + verbose, + cluster_uri, + cluster_id, + auth_token, + } = conn_info; - let cc = ConnectCommand::builder() + let cc = ConnectCommand::builder() // .user(Some("nats".to_string())) // .pass(Some("S3Cr3TP@5w0rD".to_string())) .name(Some(name)) .verbose(verbose) + .auth_token(Some(auth_token.as_str().to_string())) + .tls_required(false) + .build() + .unwrap(); + let opts = NatsClientOptions::builder() + .connect_command(cc) + .cluster_uri(cluster_uri.as_str()) .build() .unwrap(); - let opts = - NatsClientOptions::builder().connect_command(cc) - .cluster_uri(cluster_uri.as_str()) - .build() - .unwrap(); - let publisher = NatsClient::from_options(opts) - .map_err(Into::::into) + let publisher = NatsClient::from_options(opts) + .map_err(|e| { + error!("Error creating Nats Client from options: {}", e); + //Into::::into(e) + e.into() + }) .and_then(|client| { NatsStreamingClient::from(client) .cluster_id(cluster_id) @@ -289,12 +306,13 @@ fn init_nats_stream(conn_info: EventConnectionInfo) -> Result { }) }); - ThreadRuntime::new().expect("Couldn't create event stream runtime!") - .spawn(publisher) - .run() - .expect("something seriously wrong has occurred"); - }) - .expect("Couldn't start events thread!"); + ThreadRuntime::new() + .expect("Couldn't create event stream runtime!") + .spawn(publisher) + .run() + .expect("something seriously wrong has occurred"); + }) + .expect("Couldn't start events thread!"); sync_rx.recv()?; // TODO (CM): nicer error message Ok(EventStream(event_tx)) diff --git a/components/sup/src/lib.rs b/components/sup/src/lib.rs index 8a58dc4fcb7..a98d2068e65 100644 --- a/components/sup/src/lib.rs +++ b/components/sup/src/lib.rs @@ -96,7 +96,44 @@ mod sys; pub mod test_helpers; pub mod util; -use std::env; +use std::{env, env::VarError}; pub const PRODUCT: &str = "hab-sup"; pub const VERSION: &str = include_str!(concat!(env!("OUT_DIR"), "/VERSION")); + +/// This represents an environment variable that holds an authentication token which enables +/// integration with Automate. Supervisors use this token to connect to the messaging server +/// on the Automate side in order to send data about the services they're running via event +/// messages. If the environment variable is present, its value is the auth token. If it's not +/// present and the feature flag for the Event Stream is enabled, initialization of the Event +/// Stream will fail. +#[derive(Debug)] +pub struct AutomateAuthToken(String); + +// TODO: @gcp figure out if env::Config trait is appropriate here (for clap, etc.) +// impl env::Config for AutomateAuthToken { +// const ENVVAR: &'static str = "HAB_AUTOMATE_AUTH_TOKEN"; +// } + +impl AutomateAuthToken { + // TODO: @gcp make a real error type for the case where's there no auth token value + // refactor: to_string_lossy doesn't return an error if it can't convert the OsString + fn from_env() -> Result { + // unwrap won't fail; any error would arise from env::var()? (from_str currently doesn't return an error) + // we probably won't keep unwrap long-term + println!("getting automate auth token from env..."); + Ok(env::var("HAB_AUTOMATE_AUTH_TOKEN")?.parse().unwrap()) + } + + fn as_str(&self) -> &str { + self.0.as_str() + } +} + +impl std::str::FromStr for AutomateAuthToken { + type Err = (); + + fn from_str(s: &str) -> std::result::Result { + Ok(AutomateAuthToken(s.to_string())) + } +} diff --git a/components/sup/src/manager/mod.rs b/components/sup/src/manager/mod.rs index 0571b6ea726..a5ef28c5bb3 100644 --- a/components/sup/src/manager/mod.rs +++ b/components/sup/src/manager/mod.rs @@ -26,113 +26,86 @@ mod spec_watcher; pub(crate) mod sys; mod user_config_watcher; -use self::{peer_watcher::PeerWatcher, - self_updater::{SelfUpdater, - SUP_PKG_IDENT}, - service::{ConfigRendering, - DesiredState, - HealthCheck, - Service, - ServiceProxy, - ServiceSpec, - Topology}, - service_updater::ServiceUpdater, - spec_dir::SpecDir, - spec_watcher::SpecWatcher, - sys::Sys, - user_config_watcher::UserConfigWatcher}; -use crate::{census::{CensusRing, - CensusRingProxy}, - config::GossipListenAddr, - ctl_gateway::{self, - CtlRequest}, - error::{Error, - Result, - SupError}, - event::{self, - EventConnectionInfo, - EventCore, - EventStreamConfig}, - http_gateway, - VERSION}; +use self::{ + peer_watcher::PeerWatcher, + self_updater::{SelfUpdater, SUP_PKG_IDENT}, + service::{ + ConfigRendering, DesiredState, HealthCheck, Service, ServiceProxy, ServiceSpec, Topology, + }, + service_updater::ServiceUpdater, + spec_dir::SpecDir, + spec_watcher::SpecWatcher, + sys::Sys, + user_config_watcher::UserConfigWatcher, +}; +use crate::AutomateAuthToken; +use crate::{ + census::{CensusRing, CensusRingProxy}, + config::GossipListenAddr, + ctl_gateway::{self, CtlRequest}, + error::{Error, Result, SupError}, + event::{self, EventConnectionInfo, EventCore, EventStreamConfig}, + http_gateway, VERSION, +}; use cpu_time::ProcessTime; -use futures::{future, - prelude::*, - sync::{mpsc, - oneshot}}; -use habitat_butterfly::{member::Member, - server::{timing::Timing, - ServerProxy, - Suitability}, - trace::Trace}; -use habitat_common::{outputln, - types::ListenCtlAddr, - FeatureFlag}; -use habitat_core::{crypto::SymKey, - env::{self, - Config}, - fs::FS_ROOT_PATH, - os::{process::{self, - Pid, - Signal}, - signals::{self, - SignalEvent}}, - package::{Identifiable, - PackageIdent, - PackageInstall}, - service::ServiceGroup, - util::ToI64, - ChannelIdent}; -use habitat_launcher_client::{LauncherCli, - LAUNCHER_LOCK_CLEAN_ENV, - LAUNCHER_PID_ENV}; +use futures::{ + future, + prelude::*, + sync::{mpsc, oneshot}, +}; +use habitat_butterfly::{ + member::Member, + server::{timing::Timing, ServerProxy, Suitability}, + trace::Trace, +}; +use habitat_common::{outputln, types::ListenCtlAddr, FeatureFlag}; +use habitat_core::{ + crypto::SymKey, + env::{self, Config}, + fs::FS_ROOT_PATH, + os::{ + process::{self, Pid, Signal}, + signals::{self, SignalEvent}, + }, + package::{Identifiable, PackageIdent, PackageInstall}, + service::ServiceGroup, + util::ToI64, + ChannelIdent, +}; +use habitat_launcher_client::{LauncherCli, LAUNCHER_LOCK_CLEAN_ENV, LAUNCHER_PID_ENV}; use habitat_sup_protocol; use num_cpus; #[cfg(unix)] use proc_self; -use prometheus::{HistogramVec, - IntGauge, - IntGaugeVec}; -use rustls::{internal::pemfile, - AllowAnyAuthenticatedClient, - NoClientAuth, - RootCertStore, - ServerConfig}; +use prometheus::{HistogramVec, IntGauge, IntGaugeVec}; +use rustls::{ + internal::pemfile, AllowAnyAuthenticatedClient, NoClientAuth, RootCertStore, ServerConfig, +}; use serde_json; -use std::{self, - collections::{HashMap, - HashSet}, - fs::{self, - File, - OpenOptions}, - io::{BufRead, - BufReader, - Read, - Write}, - iter::IntoIterator, - net::SocketAddr, - path::{Path, - PathBuf}, - result, - str::FromStr, - sync::{atomic::{AtomicBool, - Ordering}, - Arc, - Condvar, - Mutex, - RwLock}, - thread, - time::Duration}; -use time::{self, - Duration as TimeDuration, - SteadyTime, - Timespec}; -use tokio::{executor, - runtime::{Builder as RuntimeBuilder, - Runtime}}; +use std::{ + self, + collections::{HashMap, HashSet}, + fs::{self, File, OpenOptions}, + io::{BufRead, BufReader, Read, Write}, + iter::IntoIterator, + net::SocketAddr, + path::{Path, PathBuf}, + result, + str::FromStr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Condvar, Mutex, RwLock, + }, + thread, + time::Duration, +}; +use time::{self, Duration as TimeDuration, SteadyTime, Timespec}; +use tokio::{ + executor, + runtime::{Builder as RuntimeBuilder, Runtime}, +}; #[cfg(windows)] -use winapi::{shared::minwindef::PDWORD, - um::processthreadsapi}; +use winapi::{shared::minwindef::PDWORD, um::processthreadsapi}; const MEMBER_ID_FILE: &str = "MEMBER_ID"; pub const PROC_LOCK_FILE: &str = "LOCK"; @@ -140,21 +113,29 @@ pub const PROC_LOCK_FILE: &str = "LOCK"; static LOGKEY: &'static str = "MR"; lazy_static! { - static ref RUN_LOOP_DURATION: HistogramVec = - register_histogram_vec!("hab_sup_run_loop_duration_seconds", - "The time it takes for one tick of a run loop", - &["loop"]).unwrap(); + static ref RUN_LOOP_DURATION: HistogramVec = register_histogram_vec!( + "hab_sup_run_loop_duration_seconds", + "The time it takes for one tick of a run loop", + &["loop"] + ) + .unwrap(); static ref FILE_DESCRIPTORS: IntGauge = register_int_gauge!( "hab_sup_open_file_descriptors_total", "A count of the total number of open file descriptors. Unix only" - ).unwrap(); - static ref MEMORY_STATS: IntGaugeVec = - register_int_gauge_vec!("hab_sup_memory_allocations_bytes", - "Memory allocation statistics", - &["category"]).unwrap(); - static ref CPU_TIME: IntGauge = register_int_gauge!("hab_sup_cpu_time_nanoseconds", - "CPU time of the supervisor process in \ - nanoseconds").unwrap(); + ) + .unwrap(); + static ref MEMORY_STATS: IntGaugeVec = register_int_gauge_vec!( + "hab_sup_memory_allocations_bytes", + "Memory allocation statistics", + &["category"] + ) + .unwrap(); + static ref CPU_TIME: IntGauge = register_int_gauge!( + "hab_sup_cpu_time_nanoseconds", + "CPU time of the supervisor process in \ + nanoseconds" + ) + .unwrap(); } #[derive(Debug, Clone, PartialEq, Eq)] @@ -163,7 +144,7 @@ enum ServiceOperation { Start(ServiceSpec), Stop(ServiceSpec), Restart { - to_stop: ServiceSpec, + to_stop: ServiceSpec, to_start: ServiceSpec, }, } @@ -191,50 +172,53 @@ enum ShutdownMode { pub struct FsCfg { pub sup_root: PathBuf, - data_path: PathBuf, - specs_path: PathBuf, + data_path: PathBuf, + specs_path: PathBuf, member_id_file: PathBuf, proc_lock_file: PathBuf, } impl FsCfg { fn new(sup_root: T) -> Self - where T: Into + where + T: Into, { let sup_root = sup_root.into(); - FsCfg { specs_path: sup_root.join("specs"), - data_path: sup_root.join("data"), - member_id_file: sup_root.join(MEMBER_ID_FILE), - proc_lock_file: sup_root.join(PROC_LOCK_FILE), - sup_root } + FsCfg { + specs_path: sup_root.join("specs"), + data_path: sup_root.join("data"), + member_id_file: sup_root.join(MEMBER_ID_FILE), + proc_lock_file: sup_root.join(PROC_LOCK_FILE), + sup_root, + } } } #[derive(Clone, Debug)] pub struct ManagerConfig { - pub auto_update: bool, - pub custom_state_path: Option, - pub cache_key_path: PathBuf, - pub update_url: String, - pub update_channel: ChannelIdent, - pub gossip_listen: GossipListenAddr, - pub ctl_listen: ListenCtlAddr, - pub http_listen: http_gateway::ListenAddr, - pub http_disable: bool, - pub gossip_peers: Vec, - pub gossip_permanent: bool, - pub ring_key: Option, - pub organization: Option, - pub watch_peer_file: Option, - pub tls_config: Option, - pub feature_flags: FeatureFlag, + pub auto_update: bool, + pub custom_state_path: Option, + pub cache_key_path: PathBuf, + pub update_url: String, + pub update_channel: ChannelIdent, + pub gossip_listen: GossipListenAddr, + pub ctl_listen: ListenCtlAddr, + pub http_listen: http_gateway::ListenAddr, + pub http_disable: bool, + pub gossip_peers: Vec, + pub gossip_permanent: bool, + pub ring_key: Option, + pub organization: Option, + pub watch_peer_file: Option, + pub tls_config: Option, + pub feature_flags: FeatureFlag, pub event_stream_config: Option, } #[derive(Clone, Debug)] pub struct TLSConfig { - pub cert_path: PathBuf, - pub key_path: PathBuf, + pub cert_path: PathBuf, + pub key_path: PathBuf, pub ca_cert_path: Option, } @@ -285,7 +269,9 @@ impl env::Config for GatewayAuthToken { struct ReconciliationFlag(Arc); impl ReconciliationFlag { - fn new(value: bool) -> Self { ReconciliationFlag(Arc::new(AtomicBool::new(value))) } + fn new(value: bool) -> Self { + ReconciliationFlag(Arc::new(AtomicBool::new(value))) + } /// Called after a service has finished some asynchronous /// operation to signal that we need to take a look at their spec @@ -296,9 +282,13 @@ impl ReconciliationFlag { /// We used `Ordering::Relaxed` here because there isn't a need to /// sequence operations for multiple actors setting the value to /// `true`. - fn set(&self) { self.0.store(true, Ordering::Relaxed); } + fn set(&self) { + self.0.store(true, Ordering::Relaxed); + } - fn is_set(&self) -> bool { self.0.load(Ordering::Relaxed) } + fn is_set(&self) -> bool { + self.0.load(Ordering::Relaxed) + } /// Returns whether or not we need to re-examine spec files in /// response to some service having finished an asynchronous @@ -321,7 +311,9 @@ impl ReconciliationFlag { /// one place seemed the prudent choice. In the long-term, we /// should be able to dispense with this altogether once we're all /// asynchronous. - fn toggle_if_set(&self) -> bool { self.0.compare_and_swap(true, false, Ordering::Relaxed) } + fn toggle_if_set(&self) -> bool { + self.0.compare_and_swap(true, false, Ordering::Relaxed) + } } /// This struct encapsulates the shared state for the supervisor. It's worth noting that if there's @@ -336,20 +328,20 @@ pub struct ManagerState { #[derive(Debug, Default)] pub struct GatewayState { - pub census_data: String, - pub butterfly_data: String, - pub services_data: String, + pub census_data: String, + pub butterfly_data: String, + pub services_data: String, pub health_check_data: HashMap, - pub auth_token: Option, + pub auth_token: Option, } pub struct Manager { - pub state: Arc, - butterfly: habitat_butterfly::Server, - census_ring: CensusRing, - fs_cfg: Arc, - launcher: LauncherCli, - updater: Arc>, + pub state: Arc, + butterfly: habitat_butterfly::Server, + census_ring: CensusRing, + fs_cfg: Arc, + launcher: LauncherCli, + updater: Arc>, peer_watcher: Option, spec_watcher: SpecWatcher, // This Arc> business is a potentially temporary @@ -362,12 +354,12 @@ pub struct Manager { // other threads (e.g., maybe we subscribe to messages to change // the watcher) user_config_watcher: Arc>, - spec_dir: SpecDir, - organization: Option, - self_updater: Option, - service_states: HashMap, - sys: Arc, - http_disable: bool, + spec_dir: SpecDir, + organization: Option, + self_updater: Option, + service_states: HashMap, + sys: Arc, + http_disable: bool, /// Collects the identifiers of all services that are currently /// doing something asynchronously (like shutting down, or running @@ -423,7 +415,11 @@ impl Manager { let cfg_static = cfg.clone(); let self_updater = if cfg.auto_update { if current.fully_qualified() { - Some(SelfUpdater::new(current, cfg.update_url, cfg.update_channel)) + Some(SelfUpdater::new( + current, + cfg.update_url, + cfg.update_channel, + )) } else { warn!("Supervisor version not fully qualified, unable to start self-updater"); None @@ -431,10 +427,12 @@ impl Manager { } else { None }; - let mut sys = Sys::new(cfg.gossip_permanent, - cfg.gossip_listen, - cfg.ctl_listen, - cfg.http_listen); + let mut sys = Sys::new( + cfg.gossip_permanent, + cfg.gossip_listen, + cfg.ctl_listen, + cfg.http_listen, + ); let member = Self::load_member(&mut sys, &fs_cfg)?; let services = Arc::new(RwLock::new(HashMap::new())); @@ -442,14 +440,16 @@ impl Manager { let mut gateway_state = GatewayState::default(); gateway_state.auth_token = gateway_auth_token.0; - let server = habitat_butterfly::Server::new(sys.gossip_listen(), - sys.gossip_listen(), - member, - Trace::default(), - cfg.ring_key, - None, - Some(&fs_cfg.data_path), - Box::new(SuitabilityLookup(services.clone())))?; + let server = habitat_butterfly::Server::new( + sys.gossip_listen(), + sys.gossip_listen(), + member, + Trace::default(), + cfg.ring_key, + None, + Some(&fs_cfg.data_path), + Box::new(SuitabilityLookup(services.clone())), + )?; outputln!("Supervisor Member-ID {}", sys.member_id); for peer_addr in &cfg.gossip_peers { let mut peer = Member::default(); @@ -472,39 +472,63 @@ impl Manager { if cfg.feature_flags.contains(FeatureFlag::EVENT_STREAM) { // Putting configuration of the stream behind a feature - // flag for now. If the flag isn't set, just don't + // flag for now. If the flag isn't set, just don't // initialize the stream; everything else will turn into a // no-op automatically. - let es_config = - cfg.event_stream_config - .expect("Config should be present if the EventStream feature is enabled"); - let ec = EventCore::new(es_config, &sys); + // TODO: Determine what the actual connection parameters // should be, and process them at some point before here. - event::init_stream(EventConnectionInfo::default(), ec); + let es_config = cfg + .event_stream_config + .expect("Config should be present if the EventStream feature is enabled"); + let ec = EventCore::new(es_config, &sys); + // unwrap won't fail here; if there were an issue, from_env() + // would have already propagated an error up the stack. + let c = AutomateAuthToken::from_env().unwrap(); + event::init_stream(Self::init_conn_info(&c), ec); } - Ok(Manager { state: Arc::new(ManagerState { cfg: cfg_static, - services, - gateway_state: - Arc::new(RwLock::new(gateway_state)) }), - self_updater, - updater: Arc::new(Mutex::new(ServiceUpdater::new(server.clone()))), - census_ring: CensusRing::new(sys.member_id.clone()), - butterfly: server, - launcher, - peer_watcher, - spec_watcher, - user_config_watcher: Arc::new(RwLock::new(UserConfigWatcher::new())), - spec_dir, - fs_cfg: Arc::new(fs_cfg), - organization: cfg.organization, - service_states: HashMap::new(), - sys: Arc::new(sys), - http_disable: cfg.http_disable, - busy_services: Arc::new(Mutex::new(HashSet::new())), - services_need_reconciliation: ReconciliationFlag::new(false), - feature_flags: cfg.feature_flags }) + Ok(Manager { + state: Arc::new(ManagerState { + cfg: cfg_static, + services, + gateway_state: Arc::new(RwLock::new(gateway_state)), + }), + self_updater, + updater: Arc::new(Mutex::new(ServiceUpdater::new(server.clone()))), + census_ring: CensusRing::new(sys.member_id.clone()), + butterfly: server, + launcher, + peer_watcher, + spec_watcher, + user_config_watcher: Arc::new(RwLock::new(UserConfigWatcher::new())), + spec_dir, + fs_cfg: Arc::new(fs_cfg), + organization: cfg.organization, + service_states: HashMap::new(), + sys: Arc::new(sys), + http_disable: cfg.http_disable, + busy_services: Arc::new(Mutex::new(HashSet::new())), + services_need_reconciliation: ReconciliationFlag::new(false), + feature_flags: cfg.feature_flags, + }) + } + + /// Initialize struct containing the connection information required to + /// connect to the Automate messaging server: subject, verbosity, messaging + /// cluster id and uri, and authentication token. + fn init_conn_info(msg_auth_token: &AutomateAuthToken) -> event::EventConnectionInfo { + // Messaging connection information is hard-coded for now, + // with the exception of the Authomate authentication token. + // TODO: Determine what the actual connection parameters + // should be, and process them at some point before here. + EventConnectionInfo { + name: String::from("habitat"), + verbose: true, + cluster_uri: String::from("10.0.0.174:4222"), + cluster_id: String::from("event-service"), + auth_token: msg_auth_token, + } } /// Load the initial Butterly Member which is used in initializing the Butterfly server. This @@ -528,20 +552,19 @@ impl Manager { })?; member.id = member_id; } - Err(_) => { - match File::create(&fs_cfg.member_id_file) { - Ok(mut file) => { - file.write(member.id.as_bytes()).map_err(|e| { + Err(_) => match File::create(&fs_cfg.member_id_file) { + Ok(mut file) => { + file.write(member.id.as_bytes()).map_err(|e| { sup_error!(Error::BadDataFile(fs_cfg.member_id_file.clone(), e)) })?; - } - Err(err) => { - return Err(sup_error!(Error::BadDataFile(fs_cfg.member_id_file - .clone(), - err))); - } } - } + Err(err) => { + return Err(sup_error!(Error::BadDataFile( + fs_cfg.member_id_file.clone(), + err + ))); + } + }, } sys.member_id = member.id.to_string(); member.persistent = sys.permanent; @@ -592,12 +615,13 @@ impl Manager { // back to us. Since we consume and deconstruct the spec in `Service::new()` which // `Service::load()` eventually delegates to we just can't have that. We should clean // this up in the future. - let service = match Service::load(self.sys.clone(), - spec.clone(), - self.fs_cfg.clone(), - self.organization.as_ref().map(|org| &**org), - self.state.gateway_state.clone()) - { + let service = match Service::load( + self.sys.clone(), + spec.clone(), + self.fs_cfg.clone(), + self.organization.as_ref().map(|org| &**org), + self.state.gateway_state.clone(), + ) { Ok(service) => { outputln!("Starting {} ({})", &spec.ident, service.pkg.ident); service @@ -624,12 +648,16 @@ impl Manager { } if let Err(e) = service.create_svc_path() { - outputln!("Can't create directory {}: {}", - service.pkg.svc_path.display(), - e); - outputln!("If this service is running as non-root, you'll need to create {} and give \ - the current user write access to it", - service.pkg.svc_path.display()); + outputln!( + "Can't create directory {}: {}", + service.pkg.svc_path.display(), + e + ); + outputln!( + "If this service is running as non-root, you'll need to create {} and give \ + the current user write access to it", + service.pkg.svc_path.display() + ); outputln!("{} failed to start", &spec.ident); return; } @@ -639,14 +667,17 @@ impl Manager { self.butterfly.start_election(&service.service_group, 0); } - if let Err(e) = self.user_config_watcher - .write() - .expect("user-config-watcher lock is poisoned") - .add(&service) + if let Err(e) = self + .user_config_watcher + .write() + .expect("user-config-watcher lock is poisoned") + .add(&service) { - outputln!("Unable to start UserConfigWatcher for {}: {}", - service.spec_ident, - e); + outputln!( + "Unable to start UserConfigWatcher for {}: {}", + service.spec_ident, + e + ); return; } @@ -675,19 +706,20 @@ impl Manager { let mut next_cpu_measurement = SteadyTime::now(); let mut cpu_start = ProcessTime::now(); - let mut runtime = - RuntimeBuilder::new().name_prefix("tokio-") - .core_threads(TokioThreadCount::configured_value().into()) - .build() - .expect("Couldn't build Tokio Runtime!"); + let mut runtime = RuntimeBuilder::new() + .name_prefix("tokio-") + .core_threads(TokioThreadCount::configured_value().into()) + .build() + .expect("Couldn't build Tokio Runtime!"); let (ctl_tx, ctl_rx) = mpsc::unbounded(); let (ctl_shutdown_tx, ctl_shutdown_rx) = oneshot::channel(); - let ctl_handler = - CtlAcceptor::new(self.state.clone(), ctl_rx, ctl_shutdown_rx).for_each(move |handler| { + let ctl_handler = CtlAcceptor::new(self.state.clone(), ctl_rx, ctl_shutdown_rx).for_each( + move |handler| { executor::spawn(handler); Ok(()) - }); + }, + ); runtime.spawn(ctl_handler); if let Some(svc_load) = svc { @@ -698,8 +730,10 @@ impl Manager { // (which will be all of them at this point!) self.maybe_spawn_service_futures(&mut runtime); - outputln!("Starting gossip-listener on {}", - self.butterfly.gossip_addr()); + outputln!( + "Starting gossip-listener on {}", + self.butterfly.gossip_addr() + ); self.butterfly.start(Timing::default())?; debug!("gossip-listener started"); self.persist_state(); @@ -718,27 +752,29 @@ impl Manager { // thread, where that process is more cumbersome. let tls_server_config = match &self.state.cfg.tls_config { - Some(c) => { - match tls_config(c) { - Ok(c) => Some(c), - Err(e) => return Err(e), - } - } + Some(c) => match tls_config(c) { + Ok(c) => Some(c), + Err(e) => return Err(e), + }, None => None, }; // Here we use a Condvar to wait on the HTTP gateway server to start up and inspect its // return value. Specifically, we're looking for errors when it tries to bind to the // listening TCP socket, so we can alert the user. - let pair = - Arc::new((Mutex::new(http_gateway::ServerStartup::NotStarted), Condvar::new())); + let pair = Arc::new(( + Mutex::new(http_gateway::ServerStartup::NotStarted), + Condvar::new(), + )); outputln!("Starting http-gateway on {}", &http_listen_addr); - http_gateway::Server::run(http_listen_addr, - tls_server_config, - self.state.gateway_state.clone(), - self.feature_flags, - pair.clone()); + http_gateway::Server::run( + http_listen_addr, + tls_server_config, + self.state.gateway_state.clone(), + self.feature_flags, + pair.clone(), + ); let &(ref lock, ref cvar) = &*pair; let mut started = lock.lock().expect("Control mutex is poisoned"); @@ -810,8 +846,9 @@ impl Manager { if let Ok(exit_file_path) = env::var("HAB_FEAT_TEST_EXIT") { if let Ok(mut exit_code_file) = File::open(&exit_file_path) { let mut buffer = String::new(); - exit_code_file.read_to_string(&mut buffer) - .expect("couldn't read"); + exit_code_file + .read_to_string(&mut buffer) + .expect("couldn't read"); if let Ok(exit_code) = buffer.lines().next().unwrap_or("").parse::() { fs::remove_file(&exit_file_path).expect("couldn't remove"); outputln!("Simulating abrupt, unexpected exit with code {}", exit_code); @@ -836,8 +873,10 @@ impl Manager { } if let Some(package) = self.check_for_updated_supervisor() { - outputln!("Supervisor shutting down for automatic update to {}", - package); + outputln!( + "Supervisor shutting down for automatic update to {}", + package + ); break ShutdownMode::Restarting; } @@ -874,14 +913,15 @@ impl Manager { } self.restart_elections(); - self.census_ring - .update_from_rumors(&self.state.cfg.cache_key_path, - &self.butterfly.service_store, - &self.butterfly.election_store, - &self.butterfly.update_store, - &self.butterfly.member_list, - &self.butterfly.service_config_store, - &self.butterfly.service_file_store); + self.census_ring.update_from_rumors( + &self.state.cfg.cache_key_path, + &self.butterfly.service_store, + &self.butterfly.election_store, + &self.butterfly.update_store, + &self.butterfly.member_list, + &self.butterfly.service_config_store, + &self.butterfly.service_file_store, + ); if self.check_for_changed_services() { self.persist_state(); @@ -891,11 +931,12 @@ impl Manager { self.persist_state(); } - for service in self.state - .services - .write() - .expect("Services lock is poisoned!") - .values_mut() + for service in self + .state + .services + .write() + .expect("Services lock is poisoned!") + .values_mut() { // time will be recorded automatically by HistogramTimer's drop implementation when // this var goes out of scope @@ -917,11 +958,11 @@ impl Manager { // Measure CPU time every second if SteadyTime::now() >= next_cpu_measurement { let cpu_duration = cpu_start.elapsed(); - let cpu_nanos = - cpu_duration.as_secs() - .checked_mul(1_000_000_000) - .and_then(|ns| ns.checked_add(cpu_duration.subsec_nanos().into())) - .expect("overflow in cpu_duration"); + let cpu_nanos = cpu_duration + .as_secs() + .checked_mul(1_000_000_000) + .and_then(|ns| ns.checked_add(cpu_duration.subsec_nanos().into())) + .expect("overflow in cpu_duration"); CPU_TIME.set(cpu_nanos.to_i64()); next_cpu_measurement = SteadyTime::now() + TimeDuration::seconds(1); cpu_start = ProcessTime::now(); @@ -945,10 +986,11 @@ impl Manager { outputln!("Gracefully departing from butterfly network."); self.butterfly.set_departed(); - let mut svcs = self.state - .services - .write() - .expect("Services lock is poisoned!"); + let mut svcs = self + .state + .services + .write() + .expect("Services lock is poisoned!"); for (_ident, svc) in svcs.drain() { runtime.spawn(self.stop(svc)); @@ -957,9 +999,10 @@ impl Manager { } // Allow all existing futures to run to completion. - runtime.shutdown_on_idle() - .wait() - .expect("Error waiting on Tokio runtime to shutdown"); + runtime + .shutdown_on_idle() + .wait() + .expect("Error waiting on Tokio runtime to shutdown"); release_process_lock(&self.fs_cfg); self.butterfly.persist_data(); @@ -983,13 +1026,15 @@ impl Manager { fn take_services_with_updates(&mut self) -> Vec { let mut updater = self.updater.lock().expect("Updater lock poisoned"); - let mut state_services = self.state - .services - .write() - .expect("Services lock is poisoned!"); - let idents_to_restart: Vec<_> = state_services.iter() - .filter_map(|(current_ident, service)| { - if let Some(new_ident) = + let mut state_services = self + .state + .services + .write() + .expect("Services lock is poisoned!"); + let idents_to_restart: Vec<_> = state_services + .iter() + .filter_map(|(current_ident, service)| { + if let Some(new_ident) = updater.check_for_updated_package(&service, &self.census_ring) { outputln!("Updating from {} to {}", current_ident, new_ident); @@ -998,8 +1043,8 @@ impl Manager { trace!("No update found for {}", current_ident); None } - }) - .collect(); + }) + .collect(); let mut services_to_restart = Vec::with_capacity(idents_to_restart.len()); for current_ident in idents_to_restart { @@ -1030,13 +1075,14 @@ impl Manager { // Creates a rumor for the specified service. fn gossip_latest_service_rumor(&self, service: &Service) { - let incarnation = if let Some(rumor) = self.butterfly - .service_store - .list - .read() - .expect("Rumor store lock poisoned") - .get(&*service.service_group) - .and_then(|r| r.get(&self.sys.member_id)) + let incarnation = if let Some(rumor) = self + .butterfly + .service_store + .list + .read() + .expect("Rumor store lock poisoned") + .get(&*service.service_group) + .and_then(|r| r.get(&self.sys.member_id)) { rumor.clone().incarnation + 1 } else { @@ -1046,25 +1092,29 @@ impl Manager { self.butterfly.insert_service(service.to_rumor(incarnation)); } - fn check_for_departure(&self) -> bool { self.butterfly.is_departed() } + fn check_for_departure(&self) -> bool { + self.butterfly.is_departed() + } fn check_for_changed_services(&mut self) -> bool { let mut service_states = HashMap::new(); let mut active_services = Vec::new(); - for service in self.state - .services - .write() - .expect("Services lock is poisoned!") - .values_mut() + for service in self + .state + .services + .write() + .expect("Services lock is poisoned!") + .values_mut() { service_states.insert(service.spec_ident.clone(), service.last_state_change()); active_services.push(service.spec_ident.clone()); } - for loaded in self.spec_dir - .specs() - .iter() - .filter(|s| !active_services.contains(&s.ident)) + for loaded in self + .spec_dir + .specs() + .iter() + .filter(|s| !active_services.contains(&s.ident)) { service_states.insert(loaded.ident.clone(), Timespec::new(0, 0)); } @@ -1113,37 +1163,41 @@ impl Manager { ConfigRendering::Full }; - let services = self.state - .services - .read() - .expect("Services lock is poisoned!"); + let services = self + .state + .services + .read() + .expect("Services lock is poisoned!"); let existing_idents: Vec = services.values().map(|s| s.spec_ident.clone()).collect(); // Services that are not active but are being watched for changes // These would include stopped persistent services or other // persistent services that failed to load - let watched_services: Vec = - self.spec_dir - .specs() - .iter() - .filter(|spec| !existing_idents.contains(&spec.ident)) - .flat_map(|spec| { - Service::load(self.sys.clone(), - spec.clone(), - self.fs_cfg.clone(), - self.organization.as_ref().map(|org| &**org), - self.state.gateway_state.clone()).into_iter() - }) - .collect(); - let watched_service_proxies: Vec> = - watched_services.iter() - .map(|s| ServiceProxy::new(s, config_rendering)) - .collect(); - let mut services_to_render: Vec> = - services.values() - .map(|s| ServiceProxy::new(s, config_rendering)) - .collect(); + let watched_services: Vec = self + .spec_dir + .specs() + .iter() + .filter(|spec| !existing_idents.contains(&spec.ident)) + .flat_map(|spec| { + Service::load( + self.sys.clone(), + spec.clone(), + self.fs_cfg.clone(), + self.organization.as_ref().map(|org| &**org), + self.state.gateway_state.clone(), + ) + .into_iter() + }) + .collect(); + let watched_service_proxies: Vec> = watched_services + .iter() + .map(|s| ServiceProxy::new(s, config_rendering)) + .collect(); + let mut services_to_render: Vec> = services + .values() + .map(|s| ServiceProxy::new(s, config_rendering)) + .collect(); services_to_render.extend(watched_service_proxies); @@ -1156,45 +1210,54 @@ impl Manager { } /// Check if any elections need restarting. - fn restart_elections(&mut self) { self.butterfly.restart_elections(); } + fn restart_elections(&mut self) { + self.butterfly.restart_elections(); + } /// Create a future for stopping a Service. The Service is assumed /// to have been removed from the internal list of active services /// already (see, e.g., take_services_with_updates and /// remove_service_from_state). fn stop(&self, service: Service) -> impl Future { - Self::service_stop_future(service, - Arc::clone(&self.user_config_watcher), - Arc::clone(&self.updater), - Arc::clone(&self.busy_services), - self.services_need_reconciliation.clone()) + Self::service_stop_future( + service, + Arc::clone(&self.user_config_watcher), + Arc::clone(&self.updater), + Arc::clone(&self.busy_services), + self.services_need_reconciliation.clone(), + ) } /// Remove the given service from the manager. - fn service_stop_future(service: Service, - user_config_watcher: Arc>, - updater: Arc>, - busy_services: Arc>>, - services_need_reconciliation: ReconciliationFlag) - -> impl Future { + fn service_stop_future( + service: Service, + user_config_watcher: Arc>, + updater: Arc>, + busy_services: Arc>>, + services_need_reconciliation: ReconciliationFlag, + ) -> impl Future { // JW TODO: Update service rumor to remove service from // cluster // TODO (CM): But only if we're not going down for a restart. let ident = service.spec_ident.clone(); let stop_it = service.stop().then(move |_| { - event::service_stopped(&service); - user_config_watcher.write() - .expect("Watcher lock poisoned") - .remove(&service); - updater.lock() - .expect("Updater lock poisoned") - .remove(&service); - Ok(()) - }); - Self::wrap_async_service_operation(ident, - busy_services, - services_need_reconciliation, - stop_it) + event::service_stopped(&service); + user_config_watcher + .write() + .expect("Watcher lock poisoned") + .remove(&service); + updater + .lock() + .expect("Updater lock poisoned") + .remove(&service); + Ok(()) + }); + Self::wrap_async_service_operation( + ident, + busy_services, + services_need_reconciliation, + stop_it, + ) } /// Wrap a future that starts, stops, or restarts a service with @@ -1211,34 +1274,43 @@ impl Manager { /// As more service operations (e.g., hooks) become asynchronous, /// we'll need to wrap those operations in this logic to ensure /// consistent operation. - fn wrap_async_service_operation(ident: PackageIdent, - busy_services: Arc>>, - services_need_reconciliation: ReconciliationFlag, - fut: F) - -> impl Future - where F: IntoFuture + fn wrap_async_service_operation( + ident: PackageIdent, + busy_services: Arc>>, + services_need_reconciliation: ReconciliationFlag, + fut: F, + ) -> impl Future + where + F: IntoFuture, { // TODO (CM): can't wait for the Pinning API :( let busy_services_2 = Arc::clone(&busy_services); let ident_2 = ident.clone(); future::lazy(move || { - trace!("Flagging '{:?}' as busy, pending an asynchronous operation", - ident); - busy_services.lock() - .expect("busy_services lock is poisoned") - .insert(ident); + trace!( + "Flagging '{:?}' as busy, pending an asynchronous operation", + ident + ); + busy_services + .lock() + .expect("busy_services lock is poisoned") + .insert(ident); + Ok(()) + }) + .and_then(|_| fut) + .and_then(move |_| { + trace!( + "Removing 'busy' flag for '{:?}'; asynchronous operation over", + ident_2 + ); + busy_services_2 + .lock() + .expect("busy_services lock is poisoned") + .remove(&ident_2); + services_need_reconciliation.set(); Ok(()) - }).and_then(|_| fut) - .and_then(move |_| { - trace!("Removing 'busy' flag for '{:?}'; asynchronous operation over", - ident_2); - busy_services_2.lock() - .expect("busy_services lock is poisoned") - .remove(&ident_2); - services_need_reconciliation.set(); - Ok(()) - }) + }) } /// Determine if our on-disk spec files indicate that we should @@ -1273,40 +1345,44 @@ impl Manager { /// operations; starts are performed synchronously, while /// shutdowns and restarts are turned into futures. fn operations_into_futures(&mut self, ops: O) -> Vec> - where O: IntoIterator + where + O: IntoIterator, { ops.into_iter() - .filter_map(|op| { - match op { - ServiceOperation::Stop(spec) - | ServiceOperation::Restart { to_stop: spec, .. } => { - // Yes, Stop and Restart both turn into - // "stop"... Once we've finished stopping, we'll - // end up re-examining the spec file on disk; if - // we should be running, we'll start up again. - // - // This may change in the future, once service - // start can be performed asynchronously in a - // future; then we could just chain that future - // onto the end of the stop one for a *real* - // restart future. - let f = self.remove_service_from_state(&spec) - .map(|service| self.stop(service)); - if f.is_none() { - // We really don't expect this to happen.... - outputln!("Tried to remove service for {} but could not find it \ - running, skipping", - &spec.ident); - } - f - } - ServiceOperation::Start(spec) => { - self.add_service(&spec); - None // No future to return (currently synchronous!) - } - } - }) - .collect() + .filter_map(|op| { + match op { + ServiceOperation::Stop(spec) + | ServiceOperation::Restart { to_stop: spec, .. } => { + // Yes, Stop and Restart both turn into + // "stop"... Once we've finished stopping, we'll + // end up re-examining the spec file on disk; if + // we should be running, we'll start up again. + // + // This may change in the future, once service + // start can be performed asynchronously in a + // future; then we could just chain that future + // onto the end of the stop one for a *real* + // restart future. + let f = self + .remove_service_from_state(&spec) + .map(|service| self.stop(service)); + if f.is_none() { + // We really don't expect this to happen.... + outputln!( + "Tried to remove service for {} but could not find it \ + running, skipping", + &spec.ident + ); + } + f + } + ServiceOperation::Start(spec) => { + self.add_service(&spec); + None // No future to return (currently synchronous!) + } + } + }) + .collect() } /// Determine what services we need to start, stop, or restart in @@ -1316,22 +1392,25 @@ impl Manager { /// See `specs_to_operations` for the real logic. fn compute_service_operations(&mut self) -> Vec { // First, figure out what's currently running. - let services = self.state - .services - .read() - .expect("Services lock is poisoned"); + let services = self + .state + .services + .read() + .expect("Services lock is poisoned"); let currently_running_specs = services.values().map(Service::to_spec); // Now, figure out what we should compare against, ignoring // any services that are currently doing something // asynchronously. - let busy_services = self.busy_services - .lock() - .expect("busy_services lock is poisoned"); - let on_disk_specs = self.spec_dir - .specs() - .into_iter() - .filter(|s| !busy_services.contains(&s.ident)); + let busy_services = self + .busy_services + .lock() + .expect("busy_services lock is poisoned"); + let on_disk_specs = self + .spec_dir + .specs() + .into_iter() + .filter(|s| !busy_services.contains(&s.ident)); Self::specs_to_operations(currently_running_specs, on_disk_specs) } @@ -1339,82 +1418,104 @@ impl Manager { /// Pure utility function to generate a list of operations to /// perform to bring what's currently running with what _should_ be /// running, based on the current on-disk spec files. - fn specs_to_operations(currently_running_specs: C, - on_disk_specs: D) - -> Vec - where C: IntoIterator, - D: IntoIterator + fn specs_to_operations( + currently_running_specs: C, + on_disk_specs: D, + ) -> Vec + where + C: IntoIterator, + D: IntoIterator, { let mut svc_states = HashMap::new(); #[derive(Default, Debug)] struct ServiceState { running: Option, - disk: Option<(DesiredState, ServiceSpec)>, + disk: Option<(DesiredState, ServiceSpec)>, } for rs in currently_running_specs { - svc_states.insert(rs.ident.clone(), - ServiceState { running: Some(rs), - disk: None, }); + svc_states.insert( + rs.ident.clone(), + ServiceState { + running: Some(rs), + disk: None, + }, + ); } for ds in on_disk_specs { let ident = ds.ident.clone(); - svc_states.entry(ident) - .or_insert_with(ServiceState::default) - .disk = Some((ds.desired_state, ds)); + svc_states + .entry(ident) + .or_insert_with(ServiceState::default) + .disk = Some((ds.desired_state, ds)); } - svc_states.into_iter() - .filter_map(|(ident, ss)| { - match ss { - ServiceState { disk: Some((DesiredState::Up, disk_spec)), - running: None, } => { - debug!("Reconciliation: '{}' queued for start", ident); - Some(ServiceOperation::Start(disk_spec)) - } - - ServiceState { disk: Some((DesiredState::Up, disk_spec)), - running: Some(running_spec), } => { - if running_spec == disk_spec { - debug!("Reconciliation: '{}' unchanged", ident); - None - } else { - // TODO (CM): In the future, this would be the - // place where we can evaluate what has changed - // between the spec-on-disk and our in-memory - // representation and potentially just bring our - // in-memory representation in line without having - // to restart the entire service. - debug!("Reconciliation: '{}' queued for restart", ident); - Some(ServiceOperation::Restart { to_stop: running_spec, - to_start: disk_spec, }) - } - } - ServiceState { disk: Some((DesiredState::Down, _)), - running: Some(running_spec), } => { - debug!("Reconciliation: '{}' queued for stop", ident); - Some(ServiceOperation::Stop(running_spec)) - } - - ServiceState { disk: Some((DesiredState::Down, _)), - running: None, } => { - debug!("Reconciliation: '{}' should be down, and is", ident); - None - } - - ServiceState { disk: None, - running: Some(running_spec), } => { - debug!("Reconciliation: '{}' queued for shutdown", ident); - Some(ServiceOperation::Stop(running_spec)) - } - - ServiceState { disk: None, - running: None, } => unreachable!(), - } - }) - .collect() + svc_states + .into_iter() + .filter_map(|(ident, ss)| { + match ss { + ServiceState { + disk: Some((DesiredState::Up, disk_spec)), + running: None, + } => { + debug!("Reconciliation: '{}' queued for start", ident); + Some(ServiceOperation::Start(disk_spec)) + } + + ServiceState { + disk: Some((DesiredState::Up, disk_spec)), + running: Some(running_spec), + } => { + if running_spec == disk_spec { + debug!("Reconciliation: '{}' unchanged", ident); + None + } else { + // TODO (CM): In the future, this would be the + // place where we can evaluate what has changed + // between the spec-on-disk and our in-memory + // representation and potentially just bring our + // in-memory representation in line without having + // to restart the entire service. + debug!("Reconciliation: '{}' queued for restart", ident); + Some(ServiceOperation::Restart { + to_stop: running_spec, + to_start: disk_spec, + }) + } + } + ServiceState { + disk: Some((DesiredState::Down, _)), + running: Some(running_spec), + } => { + debug!("Reconciliation: '{}' queued for stop", ident); + Some(ServiceOperation::Stop(running_spec)) + } + + ServiceState { + disk: Some((DesiredState::Down, _)), + running: None, + } => { + debug!("Reconciliation: '{}' should be down, and is", ident); + None + } + + ServiceState { + disk: None, + running: Some(running_spec), + } => { + debug!("Reconciliation: '{}' queued for shutdown", ident); + Some(ServiceOperation::Stop(running_spec)) + } + + ServiceState { + disk: None, + running: None, + } => unreachable!(), + } + }) + .collect() } fn update_peers_from_watch_file(&mut self) -> Result<()> { @@ -1434,16 +1535,18 @@ impl Manager { } fn update_running_services_from_user_config_watcher(&mut self) { - let mut services = self.state - .services - .write() - .expect("Services lock is poisoned"); + let mut services = self + .state + .services + .write() + .expect("Services lock is poisoned"); for service in services.values_mut() { - if self.user_config_watcher - .read() - .expect("user_config_watcher lock is poisoned") - .have_events_for(service) + if self + .user_config_watcher + .read() + .expect("user_config_watcher lock is poisoned") + .have_events_for(service) { outputln!("user.toml changes detected for {}", &service.spec_ident); service.user_config_updated = true; @@ -1457,15 +1560,16 @@ fn tls_config(config: &TLSConfig) -> Result { Some(path) => { let mut root_store = RootCertStore::empty(); let ca_file = &mut BufReader::new(File::open(path)?); - root_store.add_pem_file(ca_file) - .and_then(|(added, _)| { - if added < 1 { - Err(()) - } else { - Ok(AllowAnyAuthenticatedClient::new(root_store)) - } - }) - .map_err(|_| sup_error!(Error::InvalidCertFile(path.clone())))? + root_store + .add_pem_file(ca_file) + .and_then(|(added, _)| { + if added < 1 { + Err(()) + } else { + Ok(AllowAnyAuthenticatedClient::new(root_store)) + } + }) + .map_err(|_| sup_error!(Error::InvalidCertFile(path.clone())))? } None => NoClientAuth::new(), }; @@ -1477,18 +1581,13 @@ fn tls_config(config: &TLSConfig) -> Result { // Note that we must explicitly map these errors because rustls returns () as the error from // both pemfile::certs() as well as pemfile::rsa_private_keys() and we want to return // different errors for each. - let cert_chain = - pemfile::certs(cert_file).and_then(|c| if c.is_empty() { Err(()) } else { Ok(c) }) - .map_err(|_| { - sup_error!(Error::InvalidCertFile(config.cert_path.clone())) - })?; - - let key = - pemfile::rsa_private_keys(key_file).and_then(|mut k| k.pop().ok_or(())) - .map_err(|_| { - sup_error!(Error::InvalidKeyFile(config.key_path - .clone())) - })?; + let cert_chain = pemfile::certs(cert_file) + .and_then(|c| if c.is_empty() { Err(()) } else { Ok(c) }) + .map_err(|_| sup_error!(Error::InvalidCertFile(config.cert_path.clone())))?; + + let key = pemfile::rsa_private_keys(key_file) + .and_then(|mut k| k.pop().ok_or(())) + .map_err(|_| sup_error!(Error::InvalidKeyFile(config.key_path.clone())))?; server_config.set_single_cert(cert_chain, key)?; server_config.ignore_client_order = true; @@ -1511,8 +1610,9 @@ impl FromStr for TokioThreadCount { type Err = Error; fn from_str(s: &str) -> result::Result { - let raw = s.parse::() - .map_err(|_| Error::InvalidTokioThreadCount)?; + let raw = s + .parse::() + .map_err(|_| Error::InvalidTokioThreadCount)?; if raw > 0 { Ok(TokioThreadCount(raw)) } else { @@ -1526,7 +1626,9 @@ impl env::Config for TokioThreadCount { } impl Into for TokioThreadCount { - fn into(self) -> usize { self.0 } + fn into(self) -> usize { + self.0 + } } #[derive(Debug)] @@ -1547,47 +1649,45 @@ impl Suitability for SuitabilityLookup { fn obtain_process_lock(fs_cfg: &FsCfg) -> Result<()> { match write_process_lock(&fs_cfg.proc_lock_file) { Ok(()) => Ok(()), - Err(_) => { - match read_process_lock(&fs_cfg.proc_lock_file) { - Ok(pid) => { - if process::is_alive(pid) { - return Err(sup_error!(Error::ProcessLocked(pid))); - } - release_process_lock(&fs_cfg); - write_process_lock(&fs_cfg.proc_lock_file) - } - Err(SupError { err: Error::ProcessLockCorrupt, - .. }) => { - release_process_lock(&fs_cfg); - write_process_lock(&fs_cfg.proc_lock_file) + Err(_) => match read_process_lock(&fs_cfg.proc_lock_file) { + Ok(pid) => { + if process::is_alive(pid) { + return Err(sup_error!(Error::ProcessLocked(pid))); } - Err(err) => Err(err), + release_process_lock(&fs_cfg); + write_process_lock(&fs_cfg.proc_lock_file) } - } + Err(SupError { + err: Error::ProcessLockCorrupt, + .. + }) => { + release_process_lock(&fs_cfg); + write_process_lock(&fs_cfg.proc_lock_file) + } + Err(err) => Err(err), + }, } } fn read_process_lock(lock_path: T) -> Result - where T: AsRef +where + T: AsRef, { match File::open(lock_path.as_ref()) { Ok(file) => { let reader = BufReader::new(file); match reader.lines().next() { - Some(Ok(line)) => { - match line.parse::() { - Ok(pid) => Ok(pid), - Err(_) => Err(sup_error!(Error::ProcessLockCorrupt)), - } - } + Some(Ok(line)) => match line.parse::() { + Ok(pid) => Ok(pid), + Err(_) => Err(sup_error!(Error::ProcessLockCorrupt)), + }, _ => Err(sup_error!(Error::ProcessLockCorrupt)), } } - Err(err) => { - Err(sup_error!(Error::ProcessLockIO(lock_path.as_ref() - .to_path_buf(), - err))) - } + Err(err) => Err(sup_error!(Error::ProcessLockIO( + lock_path.as_ref().to_path_buf(), + err + ))), } } @@ -1598,11 +1698,13 @@ fn release_process_lock(fs_cfg: &FsCfg) { } fn write_process_lock(lock_path: T) -> Result<()> - where T: AsRef +where + T: AsRef, { - match OpenOptions::new().write(true) - .create_new(true) - .open(lock_path.as_ref()) + match OpenOptions::new() + .write(true) + .create_new(true) + .open(lock_path.as_ref()) { Ok(mut file) => { let pid = match env::var(LAUNCHER_PID_ENV) { @@ -1611,18 +1713,16 @@ fn write_process_lock(lock_path: T) -> Result<()> }; match write!(&mut file, "{}", pid) { Ok(()) => Ok(()), - Err(err) => { - Err(sup_error!(Error::ProcessLockIO(lock_path.as_ref() - .to_path_buf(), - err))) - } + Err(err) => Err(sup_error!(Error::ProcessLockIO( + lock_path.as_ref().to_path_buf(), + err + ))), } } - Err(err) => { - Err(sup_error!(Error::ProcessLockIO(lock_path.as_ref() - .to_path_buf(), - err))) - } + Err(err) => Err(sup_error!(Error::ProcessLockIO( + lock_path.as_ref().to_path_buf(), + err + ))), } } @@ -1637,16 +1737,18 @@ fn get_fd_count() -> std::io::Result { // these are ints here because GetProcessHandleCount returns a BOOL which is actually // an i32 1 => Ok(count as usize), - _ => { - Err(std::io::Error::new(std::io::ErrorKind::Other, - "error getting file descriptor count")) - } + _ => Err(std::io::Error::new( + std::io::ErrorKind::Other, + "error getting file descriptor count", + )), } } } #[cfg(unix)] -fn get_fd_count() -> std::io::Result { proc_self::FdIter::new().map(|f| f.count()) } +fn get_fd_count() -> std::io::Result { + proc_self::FdIter::new().map(|f| f.count()) +} #[cfg(unix)] fn track_memory_stats() { @@ -1654,18 +1756,24 @@ fn track_memory_stats() { // when the epoch is advanced. We manually advance it here to ensure our stats are // fresh. jemalloc_ctl::epoch().unwrap(); - MEMORY_STATS.with_label_values(&["active"]) - .set(jemalloc_ctl::stats::active().unwrap().to_i64()); - MEMORY_STATS.with_label_values(&["allocated"]) - .set(jemalloc_ctl::stats::allocated().unwrap().to_i64()); - MEMORY_STATS.with_label_values(&["mapped"]) - .set(jemalloc_ctl::stats::mapped().unwrap().to_i64()); - MEMORY_STATS.with_label_values(&["metadata"]) - .set(jemalloc_ctl::stats::metadata().unwrap().to_i64()); - MEMORY_STATS.with_label_values(&["resident"]) - .set(jemalloc_ctl::stats::resident().unwrap().to_i64()); - MEMORY_STATS.with_label_values(&["retained"]) - .set(jemalloc_ctl::stats::retained().unwrap().to_i64()); + MEMORY_STATS + .with_label_values(&["active"]) + .set(jemalloc_ctl::stats::active().unwrap().to_i64()); + MEMORY_STATS + .with_label_values(&["allocated"]) + .set(jemalloc_ctl::stats::allocated().unwrap().to_i64()); + MEMORY_STATS + .with_label_values(&["mapped"]) + .set(jemalloc_ctl::stats::mapped().unwrap().to_i64()); + MEMORY_STATS + .with_label_values(&["metadata"]) + .set(jemalloc_ctl::stats::metadata().unwrap().to_i64()); + MEMORY_STATS + .with_label_values(&["resident"]) + .set(jemalloc_ctl::stats::resident().unwrap().to_i64()); + MEMORY_STATS + .with_label_values(&["retained"]) + .set(jemalloc_ctl::stats::retained().unwrap().to_i64()); } // This is a no-op on purpose because windows doesn't support jemalloc @@ -1673,19 +1781,22 @@ fn track_memory_stats() { fn track_memory_stats() {} struct CtlAcceptor { - rx: ctl_gateway::server::MgrReceiver, - state: Arc, + rx: ctl_gateway::server::MgrReceiver, + state: Arc, shutdown_trigger: oneshot::Receiver<()>, } impl CtlAcceptor { - fn new(state: Arc, - rx: ctl_gateway::server::MgrReceiver, - shutdown_trigger: oneshot::Receiver<()>) - -> Self { - CtlAcceptor { state, - rx, - shutdown_trigger } + fn new( + state: Arc, + rx: ctl_gateway::server::MgrReceiver, + shutdown_trigger: oneshot::Receiver<()>, + ) -> Self { + CtlAcceptor { + state, + rx, + shutdown_trigger, + } } } @@ -1703,26 +1814,24 @@ impl Stream for CtlAcceptor { error!("Error polling CtlAcceptor shutdown trigger: {:?}", e); Ok(Async::Ready(None)) } - Ok(Async::NotReady) => { - match self.rx.poll() { - Ok(Async::Ready(Some(cmd))) => { - let task = CtlHandler::new(cmd, self.state.clone()); - Ok(Async::Ready(Some(task))) - } - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => { - debug!("CtlAcceptor error, {:?}", e); - Err(()) - } + Ok(Async::NotReady) => match self.rx.poll() { + Ok(Async::Ready(Some(cmd))) => { + let task = CtlHandler::new(cmd, self.state.clone()); + Ok(Async::Ready(Some(task))) } - } + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => { + debug!("CtlAcceptor error, {:?}", e); + Err(()) + } + }, } } } struct CtlHandler { - cmd: ctl_gateway::server::CtlCommand, + cmd: ctl_gateway::server::CtlCommand, state: Arc, } @@ -1768,8 +1877,10 @@ mod test { assert!(!f.toggle_if_set(), "Should not be set!"); f.set(); assert!(f.toggle_if_set(), "Should have been toggled, but wasn't!"); - assert!(!f.toggle_if_set(), - "Should no longer be toggled, after having been toggled previously!"); + assert!( + !f.toggle_if_set(), + "Should no longer be toggled, after having been toggled previously!" + ); } } @@ -1778,23 +1889,25 @@ mod test { // code, so only implement it under test configuration. impl Default for ManagerConfig { fn default() -> Self { - ManagerConfig { auto_update: false, - custom_state_path: None, - cache_key_path: cache_key_path(Some(&*FS_ROOT)), - update_url: "".to_string(), - update_channel: ChannelIdent::default(), - gossip_listen: GossipListenAddr::default(), - ctl_listen: ListenCtlAddr::default(), - http_listen: http_gateway::ListenAddr::default(), - http_disable: false, - gossip_peers: vec![], - gossip_permanent: false, - ring_key: None, - organization: None, - watch_peer_file: None, - tls_config: None, - feature_flags: FeatureFlag::empty(), - event_stream_config: None, } + ManagerConfig { + auto_update: false, + custom_state_path: None, + cache_key_path: cache_key_path(Some(&*FS_ROOT)), + update_url: "".to_string(), + update_channel: ChannelIdent::default(), + gossip_listen: GossipListenAddr::default(), + ctl_listen: ListenCtlAddr::default(), + http_listen: http_gateway::ListenAddr::default(), + http_disable: false, + gossip_peers: vec![], + gossip_permanent: false, + ring_key: None, + organization: None, + watch_peer_file: None, + tls_config: None, + feature_flags: FeatureFlag::empty(), + event_stream_config: None, + } } } @@ -1803,8 +1916,10 @@ mod test { let cfg = ManagerConfig::default(); let path = cfg.sup_root(); - assert_eq!(PathBuf::from(format!("{}/default", STATE_PATH_PREFIX.to_string_lossy())), - path); + assert_eq!( + PathBuf::from(format!("{}/default", STATE_PATH_PREFIX.to_string_lossy())), + path + ); } #[test] @@ -1867,8 +1982,10 @@ mod test { /// Helper function for generating a basic spec from an /// identifier string fn new_spec(ident: &str) -> ServiceSpec { - ServiceSpec::default_for(PackageIdent::from_str(ident).expect("couldn't parse ident \ - str")) + ServiceSpec::default_for(PackageIdent::from_str(ident).expect( + "couldn't parse ident \ + str", + )) } #[test] @@ -1955,8 +2072,10 @@ mod test { assert_eq!(operations.len(), 1); match operations[0] { - ServiceOperation::Restart { to_stop: ref old, - to_start: ref new, } => { + ServiceOperation::Restart { + to_stop: ref old, + to_start: ref new, + } => { assert_eq!(old.ident, new.ident); assert_eq!(old.update_strategy, UpdateStrategy::None); assert_eq!(new.update_strategy, UpdateStrategy::AtOnce); @@ -2003,25 +2122,32 @@ mod test { // This should get shut down let svc_6_running = new_spec("core/lolwut"); - let running = vec![svc_1_running.clone(), - svc_2_running.clone(), - svc_3_running.clone(), - svc_6_running.clone(),]; - - let on_disk = vec![svc_1_on_disk.clone(), - svc_2_on_disk.clone(), - svc_3_on_disk.clone(), - svc_4_on_disk.clone(), - svc_5_on_disk.clone(),]; + let running = vec![ + svc_1_running.clone(), + svc_2_running.clone(), + svc_3_running.clone(), + svc_6_running.clone(), + ]; + + let on_disk = vec![ + svc_1_on_disk.clone(), + svc_2_on_disk.clone(), + svc_3_on_disk.clone(), + svc_4_on_disk.clone(), + svc_5_on_disk.clone(), + ]; let operations = Manager::specs_to_operations(running, on_disk); - let expected_operations = - vec![ServiceOperation::Stop(svc_2_running.clone()), - ServiceOperation::Restart { to_stop: svc_3_running.clone(), - to_start: svc_3_on_disk.clone(), }, - ServiceOperation::Start(svc_5_on_disk.clone()), - ServiceOperation::Stop(svc_6_running.clone()),]; + let expected_operations = vec![ + ServiceOperation::Stop(svc_2_running.clone()), + ServiceOperation::Restart { + to_stop: svc_3_running.clone(), + to_start: svc_3_on_disk.clone(), + }, + ServiceOperation::Start(svc_5_on_disk.clone()), + ServiceOperation::Stop(svc_6_running.clone()), + ]; // Ideally, we'd just sort `operations` and // `expected_operations`, but we can't, since that would @@ -2031,13 +2157,17 @@ mod test { // comparable. // // Instead, we'll just do the verification one at a time. - assert_eq!(operations.len(), - expected_operations.len(), - "Didn't generate the expected number of operations"); + assert_eq!( + operations.len(), + expected_operations.len(), + "Didn't generate the expected number of operations" + ); for op in expected_operations { - assert!(operations.contains(&op), - "Should have expected operation: {:?}", - op); + assert!( + operations.contains(&op), + "Should have expected operation: {:?}", + op + ); } } }