diff --git a/bin/src/command/mod.rs b/bin/src/command/mod.rs index 361da1bfc..585fc4e7f 100644 --- a/bin/src/command/mod.rs +++ b/bin/src/command/mod.rs @@ -617,11 +617,7 @@ impl CommandServer { incr!("worker_restart"); let new_worker_id = self.next_worker_id; - let listeners = Some(Listeners { - http: Vec::new(), - tls: Vec::new(), - tcp: Vec::new(), - }); + let listeners = Some(Listeners::default()); let mut new_worker = start_worker( new_worker_id, diff --git a/bin/src/command/requests.rs b/bin/src/command/requests.rs index 5eae67c7b..37a044561 100644 --- a/bin/src/command/requests.rs +++ b/bin/src/command/requests.rs @@ -419,11 +419,7 @@ impl CommandServer { info!( "sending listeners: to the new worker: {:?}", - worker.scm_socket.send_listeners(&Listeners { - http: Vec::new(), - tls: Vec::new(), - tcp: Vec::new(), - }) + worker.scm_socket.send_listeners(&Listeners::default()) ); let activate_requests = self.state.generate_activate_requests(); diff --git a/bin/src/command_v2/mod.rs b/bin/src/command_v2/mod.rs index c3f9faf02..13bb570a1 100644 --- a/bin/src/command_v2/mod.rs +++ b/bin/src/command_v2/mod.rs @@ -54,7 +54,7 @@ pub fn start_server( let mut command_hub = CommandHub::new(unix_listener, config, executable_path)?; for _ in 0..worker_count { - command_hub.launch_new_worker(); + command_hub.launch_new_worker(None); } load_static_config(&mut command_hub.server, None, None); diff --git a/bin/src/command_v2/requests.rs b/bin/src/command_v2/requests.rs index 452095af7..3f407f7fe 100644 --- a/bin/src/command_v2/requests.rs +++ b/bin/src/command_v2/requests.rs @@ -289,11 +289,7 @@ fn set_logging_level(server: &mut Server, client: &mut ClientSession, logging_fi ); // notify the workers too - worker_request( - server, - client, - RequestType::Logging(logging_filter.clone()).into(), - ); + worker_request(server, client, RequestType::Logging(logging_filter)); } fn subscribe_client_to_events(server: &mut Server, client: &mut ClientSession) { @@ -389,7 +385,7 @@ pub fn load_static_config( ) { let task_id = server.new_task(Box::new(LoadStaticConfigTask { gatherer: DefaultGatherer::default(), - client_token: client.as_ref().map(|c| c.token.clone()), + client_token: client.as_ref().map(|c| c.token), })); let new_config; @@ -852,7 +848,6 @@ impl GatheringTask for StatusTask { struct StopTask { pub client_token: Token, pub gatherer: DefaultGatherer, - pub hardness: bool, } /// stop the main process and workers, true for hard stop @@ -860,7 +855,6 @@ fn stop(server: &mut Server, client: &mut ClientSession, hardness: bool) { let task = Box::new(StopTask { client_token: client.token, gatherer: DefaultGatherer::default(), - hardness, }); server.stopping = true; @@ -885,7 +879,7 @@ impl GatheringTask for StopTask { fn on_finish( self: Box, - server: &mut Server, + _server: &mut Server, client: &mut ClientSession, _timed_out: bool, ) { diff --git a/bin/src/command_v2/server.rs b/bin/src/command_v2/server.rs index 5f1ecbfff..05b70f7f4 100644 --- a/bin/src/command_v2/server.rs +++ b/bin/src/command_v2/server.rs @@ -335,7 +335,7 @@ impl CommandHub { ); if is_finished { let task = self.tasks.remove(&task_id).unwrap(); - debug!("Removed this task: {:#?}", task); + info!("Removed this task: {:#?}", task); task.job.on_finish(&mut self.server, client, false); self.handle_finish_task(task_id); } @@ -354,7 +354,7 @@ impl CommandHub { .on_message_no_client(&mut self.server, worker_id, response); if is_finished { let task = self.tasks.remove(&task_id).unwrap(); - debug!("Removed this task: {:#?}", task); + info!("Removed this task: {:#?}", task); task.job.on_finish_no_client(&mut self.server, false); self.handle_finish_task(task_id); } @@ -425,18 +425,14 @@ impl Server { /// Forks main process into a new worker, register the worker in mio, /// send a Status request to the worker - pub fn launch_new_worker(&mut self) -> &mut WorkerSession { + pub fn launch_new_worker(&mut self, listeners: Option) -> &mut WorkerSession { let worker_id = self.next_worker_id(); let (worker_pid, main_to_worker_channel, main_to_worker_scm) = fork_main_into_worker( &worker_id.to_string(), &self.config, self.executable_path.clone(), &self.state, - Some(Listeners { - http: Vec::new(), - tls: Vec::new(), - tcp: Vec::new(), - }), + Some(listeners.unwrap_or_default()), ) .expect("could not fork main into new worker"); @@ -448,7 +444,7 @@ impl Server { ); worker_session.send(&WorkerRequest { - id: worker_id.to_string(), + id: format!("INITIAL-STATUS-{worker_id}"), content: RequestType::Status(Status {}).into(), }); @@ -510,7 +506,7 @@ impl Server { ) -> &mut WorkerSession { let token = self.next_session_token(); self.register(token, &mut channel.sock); - let worker_session = self.workers.insert( + self.workers.insert( token, WorkerSession::new(channel, worker_id, pid, token, scm_socket), ); @@ -556,10 +552,19 @@ impl Server { }; for worker in self.workers.values_mut().filter(|w| { - (w.run_state == RunState::Running) && target.map(|id| id == w.id).unwrap_or(true) + target + .map(|id| id == w.id && w.run_state != RunState::Stopped) + .unwrap_or(w.run_state == RunState::Running) }) { worker_count += 1; - worker_request.id = format!("{}-{}-query-{}", task_id, request_id, worker.id); + worker_request.id = format!( + "{}-{}-{}-{}", + worker_request.content.short_name(), + worker.id, + task_id, + request_id, + ); + info!("scattering on {} request: {:#?}", worker.id, worker_request); worker.send(&worker_request); self.in_flight.insert(worker_request.id, task_id); } @@ -578,10 +583,10 @@ impl Server { /// - calls `close_worker` /// - calls `start_worker` if needed pub fn handle_worker_close(&mut self, token: &Token) { - let worker_id = match self.workers.get(token) { + let (worker_id, worker_run_state) = match self.workers.get(token) { Some(worker) => { info!("closing worker {:#?}", worker); - worker.id + (worker.id, worker.run_state) } None => { error!("No worker exists with token {:?}", token); @@ -591,9 +596,12 @@ impl Server { self.close_worker(token); - if self.config.worker_automatic_restart && !self.stopping { + if self.config.worker_automatic_restart + && worker_run_state == RunState::Running + && !self.stopping + { info!("Automatically restarting worker {}", worker_id); - self.launch_new_worker(); + self.launch_new_worker(None); } } @@ -605,7 +613,6 @@ impl Server { return; } }; - info!("closing worker {:?}", worker); match kill(Pid::from_raw(worker.pid), Signal::SIGKILL) { Ok(()) => info!("Worker {} was successfully killed", worker.id), diff --git a/bin/src/command_v2/sessions.rs b/bin/src/command_v2/sessions.rs index f89ac4122..b755b945a 100644 --- a/bin/src/command_v2/sessions.rs +++ b/bin/src/command_v2/sessions.rs @@ -179,7 +179,7 @@ impl WorkerSession { RunState::Running | RunState::NotAnswering => RunState::NotAnswering, }; WorkerInfo { - id: self.id as u32, + id: self.id, pid: self.pid, run_state: run_state as i32, } diff --git a/bin/src/command_v2/upgrade.rs b/bin/src/command_v2/upgrade.rs index ebb7cec35..e6b2b78df 100644 --- a/bin/src/command_v2/upgrade.rs +++ b/bin/src/command_v2/upgrade.rs @@ -1,11 +1,11 @@ use mio::Token; use sozu_command_lib::{ - proto::command::{request::RequestType, ReturnListenSockets, RunState, SoftStop}, - scm_socket::Listeners, + proto::command::{request::RequestType, ResponseStatus, ReturnListenSockets, SoftStop}, + response::WorkerResponse, }; use super::{ - server::{DefaultGatherer, GatheringTask, Server, WorkerId}, + server::{Gatherer, GatheringTask, Server, WorkerId}, sessions::ClientSession, }; @@ -13,20 +13,27 @@ use super::{ enum UpgradeWorkerProgress { /// 1. request listeners from the old worker /// 2. store listeners to pass them to new worker, - RequestingListenSockets, + RequestingListenSockets { + old_worker_token: Token, + old_worker_id: WorkerId, + }, /// 3. soft stop the old worker - SoftStop, - /// activate the listeners of the new worker - ActivateNewWorker, + /// 4. activate the listeners of the new worker + StopOldActivateNew { + old_worker_id: WorkerId, + new_worker_id: WorkerId, + }, } #[derive(Debug)] struct UpgradeWorkerTask { - pub old_worker_token: Token, - pub new_worker_token: Token, pub client_token: Token, - pub gatherer: DefaultGatherer, progress: UpgradeWorkerProgress, + + ok: usize, + errors: usize, + responses: Vec<(WorkerId, WorkerResponse)>, + expected_responses: usize, } pub fn upgrade_worker(server: &mut Server, client: &mut ClientSession, old_worker_id: WorkerId) { @@ -46,35 +53,50 @@ pub fn upgrade_worker(server: &mut Server, client: &mut ClientSession, old_worke } }; - let new_worker = server.launch_new_worker(); - - client.return_processing(format!("Launched a new worker with id {}", new_worker.id)); - let new_worker_token = new_worker.token; - + client.return_processing(format!( + "Requesting listen sockets from worker {old_worker_id}" + )); server.scatter( RequestType::ReturnListenSockets(ReturnListenSockets {}).into(), Box::new(UpgradeWorkerTask { - old_worker_token, - new_worker_token, client_token: client.token, - gatherer: DefaultGatherer::default(), - progress: UpgradeWorkerProgress::RequestingListenSockets, + progress: UpgradeWorkerProgress::RequestingListenSockets { + old_worker_token, + old_worker_id, + }, + ok: 0, + errors: 0, + responses: Vec::new(), + expected_responses: 0, }), Some(old_worker_id), ); } impl UpgradeWorkerTask { - fn receive_listen_sockets(self: Box, server: &mut Server, client: &mut ClientSession) { - let old_worker = server - .workers - .get_mut(&self.old_worker_token) - .expect("Lol the old worker died already"); + fn receive_listen_sockets( + self, + server: &mut Server, + client: &mut ClientSession, + old_worker_token: Token, + old_worker_id: WorkerId, + ) { + let old_worker = match server.workers.get_mut(&old_worker_token) { + Some(old_worker) => old_worker, + None => { + client.finish_failure(format!("Worker {old_worker_id} died while upgrading, it should be restarted automatically")); + return; + } + }; + let old_worker_id = old_worker.id; - old_worker - .scm_socket - .set_blocking(true) - .expect("could not set SCM sockets to blocking"); + match old_worker.scm_socket.set_blocking(true) { + Ok(_) => {} + Err(error) => { + client.finish_failure(format!("Could not set SCM sockets to blocking: {error:?}")); + return; + } + } let listeners = match old_worker.scm_socket.receive_listeners() { Ok(listeners) => listeners, @@ -86,59 +108,42 @@ impl UpgradeWorkerTask { } }; + // lauch new worker + let new_worker = server.launch_new_worker(Some(listeners)); + client.return_processing(format!("Launched a new worker with id {}", new_worker.id)); + let new_worker_id = new_worker.id; + + let finish_task = server.new_task(Box::new(UpgradeWorkerTask { + client_token: self.client_token, + progress: UpgradeWorkerProgress::StopOldActivateNew { + old_worker_id, + new_worker_id, + }, + + ok: 0, + errors: 0, + responses: Vec::new(), + expected_responses: 0, + })); + // Stop the old worker - old_worker.run_state = RunState::Stopping; - let old_worker_id = old_worker.id; - server.scatter( + client.return_processing(format!("Soft stopping worker with id {}", old_worker_id)); + server.scatter_on( RequestType::SoftStop(SoftStop {}).into(), - Box::new(UpgradeWorkerTask { - old_worker_token: self.old_worker_token, - new_worker_token: self.new_worker_token, - client_token: self.client_token, - gatherer: DefaultGatherer::default(), - progress: UpgradeWorkerProgress::SoftStop, - }), + finish_task, + 0, Some(old_worker_id), ); - // send listeners to new worker - let new_worker = server - .workers - .get_mut(&self.new_worker_token) - .expect("Yo where's my worker at?"); - new_worker - .scm_socket - .send_listeners(&listeners) - .expect("could not send listeners to new worker"); - listeners.close(); - - let new_worker_id = new_worker.id; - // activate new worker - let activate_worker_task = server.new_task(Box::new(UpgradeWorkerTask { - old_worker_token: self.old_worker_token, - new_worker_token: self.new_worker_token, - client_token: self.client_token, - gatherer: DefaultGatherer::default(), - progress: UpgradeWorkerProgress::ActivateNewWorker, - })); - for (count, request) in server .state .generate_activate_requests() .into_iter() .enumerate() { - server.scatter_on(request, activate_worker_task, count, Some(new_worker_id)); + server.scatter_on(request, finish_task, count + 1, Some(new_worker_id)); } - - client.finish_ok( - None, - format!( - "finished upgrading worker {} to new worker {}", - old_worker_id, new_worker_id - ), - ); } } @@ -148,7 +153,7 @@ impl GatheringTask for UpgradeWorkerTask { } fn get_gatherer(&mut self) -> &mut dyn super::server::Gatherer { - &mut self.gatherer + self } fn on_finish( @@ -158,22 +163,68 @@ impl GatheringTask for UpgradeWorkerTask { _timedout: bool, ) { match self.progress { - UpgradeWorkerProgress::RequestingListenSockets => { - if self.gatherer.ok == 1 { - self.receive_listen_sockets(server, client); + UpgradeWorkerProgress::RequestingListenSockets { + old_worker_token, + old_worker_id, + } => { + if self.ok == 1 { + self.receive_listen_sockets(server, client, old_worker_token, old_worker_id); } else { client.finish_failure(format!( - "could not get listen sockets from old worker:{:?}", - self.gatherer.responses + "Could not get listen sockets from old worker:{:?}", + self.responses )); } } - UpgradeWorkerProgress::SoftStop => { - info!("finished soft stop of worker {:?}", self.old_worker_token) + UpgradeWorkerProgress::StopOldActivateNew { + old_worker_id, + new_worker_id, + } => { + client.finish_ok( + None, + format!( + "Finished soft stop of worker {:?}\nfinished activation of new worker {:?}\nUpgrade successful", + old_worker_id, new_worker_id + ), + ); } - UpgradeWorkerProgress::ActivateNewWorker => { - info!("finished activation of new worker {:?}", self.new_worker_token) + } + } +} + +impl Gatherer for UpgradeWorkerTask { + fn inc_expected_responses(&mut self, count: usize) { + self.expected_responses += count; + } + + fn on_message( + &mut self, + _server: &mut Server, + client: &mut ClientSession, + worker_id: WorkerId, + message: WorkerResponse, + ) -> bool { + match message.status { + ResponseStatus::Ok => { + self.ok += 1; + match self.progress { + UpgradeWorkerProgress::RequestingListenSockets { .. } => {} + UpgradeWorkerProgress::StopOldActivateNew { .. } => { + client.return_processing(format!( + "Worker {} answered OK to {}. {}", + worker_id, message.id, message.message + )) + } + } } + ResponseStatus::Failure => self.errors += 1, + ResponseStatus::Processing => client.return_processing(format!( + "Worker {} is processing {}. {}", + worker_id, message.id, message.message + )), } + self.responses.push((worker_id, message)); + + self.ok + self.errors >= self.expected_responses } } diff --git a/bin/src/ctl/command.rs b/bin/src/ctl/command.rs index 0c30bfcea..82dd7c01d 100644 --- a/bin/src/ctl/command.rs +++ b/bin/src/ctl/command.rs @@ -31,7 +31,7 @@ impl CommandManager { match response.status() { ResponseStatus::Processing => { if !self.json { - debug!("Proxy is processing: {}", response.message); + debug!("Processing: {}", response.message); } } ResponseStatus::Failure => bail!("Request failed: {}", response.message), @@ -62,7 +62,7 @@ impl CommandManager { match response.status() { ResponseStatus::Processing => { if !self.json { - debug!("Proxy is processing: {}", response.message); + debug!("Processing: {}", response.message); } } ResponseStatus::Failure => bail!("Request failed: {}", response.message), @@ -183,7 +183,7 @@ impl CommandManager { match response.status() { ResponseStatus::Processing => { if !self.json { - info!("Proxy is processing: {}", response.message); + info!("Processing: {}", response.message); } } ResponseStatus::Failure => bail!( @@ -229,7 +229,7 @@ impl CommandManager { match response.status() { ResponseStatus::Processing => { if !self.json { - debug!("Proxy is processing: {}", response.message); + debug!("Processing: {}", response.message); } } ResponseStatus::Failure | ResponseStatus::Ok => { diff --git a/bin/src/worker.rs b/bin/src/worker.rs index 9e1e1551f..2a05364c2 100644 --- a/bin/src/worker.rs +++ b/bin/src/worker.rs @@ -30,7 +30,7 @@ use sozu_command_lib::{ channel::Channel, config::Config, logging::setup_logging_with_config, - proto::command::{request::RequestType, Request, RunState, Status, WorkerInfo}, + proto::command::{Request, RunState, WorkerInfo}, ready::Ready, request::{read_requests_from_file, WorkerRequest}, response::WorkerResponse, diff --git a/command/src/request.rs b/command/src/request.rs index 79111eb16..8c3753c92 100644 --- a/command/src/request.rs +++ b/command/src/request.rs @@ -112,6 +112,54 @@ impl Request { Some(RequestType::SoftStop(_)) | Some(RequestType::HardStop(_)) ) } + + pub fn short_name(&self) -> &str { + match self.request_type { + Some(RequestType::SaveState(_)) => "SaveState", + Some(RequestType::LoadState(_)) => "LoadState", + Some(RequestType::ListWorkers(_)) => "ListWorkers", + Some(RequestType::ListFrontends(_)) => "ListFrontends", + Some(RequestType::ListListeners(_)) => "ListListeners", + Some(RequestType::LaunchWorker(_)) => "LaunchWorker", + Some(RequestType::UpgradeMain(_)) => "UpgradeMain", + Some(RequestType::UpgradeWorker(_)) => "UpgradeWorker", + Some(RequestType::SubscribeEvents(_)) => "SubscribeEvents", + Some(RequestType::ReloadConfiguration(_)) => "ReloadConfiguration", + Some(RequestType::Status(_)) => "Status", + Some(RequestType::AddCluster(_)) => "AddCluster", + Some(RequestType::RemoveCluster(_)) => "RemoveCluster", + Some(RequestType::AddHttpFrontend(_)) => "AddHttpFrontend", + Some(RequestType::RemoveHttpFrontend(_)) => "RemoveHttpFrontend", + Some(RequestType::AddHttpsFrontend(_)) => "AddHttpsFrontend", + Some(RequestType::RemoveHttpsFrontend(_)) => "RemoveHttpsFrontend", + Some(RequestType::AddCertificate(_)) => "AddCertificate", + Some(RequestType::ReplaceCertificate(_)) => "ReplaceCertificate", + Some(RequestType::RemoveCertificate(_)) => "RemoveCertificate", + Some(RequestType::AddTcpFrontend(_)) => "AddTcpFrontend", + Some(RequestType::RemoveTcpFrontend(_)) => "RemoveTcpFrontend", + Some(RequestType::AddBackend(_)) => "AddBackend", + Some(RequestType::RemoveBackend(_)) => "RemoveBackend", + Some(RequestType::AddHttpListener(_)) => "AddHttpListener", + Some(RequestType::AddHttpsListener(_)) => "AddHttpsListener", + Some(RequestType::AddTcpListener(_)) => "AddTcpListener", + Some(RequestType::RemoveListener(_)) => "RemoveListener", + Some(RequestType::ActivateListener(_)) => "ActivateListener", + Some(RequestType::DeactivateListener(_)) => "DeactivateListener", + Some(RequestType::QueryClusterById(_)) => "QueryClusterById", + Some(RequestType::QueryClustersByDomain(_)) => "QueryClustersByDomain", + Some(RequestType::QueryClustersHashes(_)) => "QueryClustersHashes", + Some(RequestType::QueryMetrics(_)) => "QueryMetrics", + Some(RequestType::SoftStop(_)) => "SoftStop", + Some(RequestType::HardStop(_)) => "HardStop", + Some(RequestType::ConfigureMetrics(_)) => "ConfigureMetrics", + Some(RequestType::Logging(_)) => "Logging", + Some(RequestType::ReturnListenSockets(_)) => "ReturnListenSockets", + Some(RequestType::QueryCertificatesFromTheState(_)) => "QueryCertificatesFromTheState", + Some(RequestType::QueryCertificatesFromWorkers(_)) => "QueryCertificatesFromWorkers", + Some(RequestType::CountRequests(_)) => "CountRequests", + None => "Unallowed", + } + } } /// This is sent only from Sōzu to Sōzu @@ -139,9 +187,7 @@ pub fn read_requests_from_file(file: &mut File) -> Result, Re loop { let previous = buffer.available_data(); - let bytes_read = file - .read(buffer.space()) - .map_err(RequestError::FileError)?; + let bytes_read = file.read(buffer.space()).map_err(RequestError::FileError)?; buffer.fill(bytes_read); diff --git a/command/src/scm_socket.rs b/command/src/scm_socket.rs index ec91bce8c..1ea74135c 100644 --- a/command/src/scm_socket.rs +++ b/command/src/scm_socket.rs @@ -209,7 +209,7 @@ impl ScmSocket { } /// Socket addresses and file descriptors needed by a Proxy to start listening -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq)] pub struct Listeners { pub http: Vec<(SocketAddr, RawFd)>, pub tls: Vec<(SocketAddr, RawFd)>, @@ -305,11 +305,7 @@ mod tests { let receiving_scm_socket = ScmSocket::new(stream_2.as_raw_fd()).expect("Could not create scm socket"); - let listeners = Listeners { - http: vec![], - tcp: vec![], - tls: vec![], - }; + let listeners = Listeners::default(); sending_scm_socket .send_listeners(&listeners) diff --git a/e2e/src/sozu/worker.rs b/e2e/src/sozu/worker.rs index 9dd30c6ff..1527a5115 100644 --- a/e2e/src/sozu/worker.rs +++ b/e2e/src/sozu/worker.rs @@ -51,18 +51,6 @@ pub fn set_no_close_exec(fd: i32) { } impl Worker { - pub fn empty_file_config() -> FileConfig { - FileConfig::default() - } - - pub fn empty_listeners() -> Listeners { - Listeners { - http: Vec::new(), - tls: Vec::new(), - tcp: Vec::new(), - } - } - pub fn into_config(file_config: FileConfig) -> Config { ConfigBuilder::new(file_config, "") .into_config() @@ -70,8 +58,8 @@ impl Worker { } pub fn empty_config() -> (Config, Listeners, ConfigState) { - let listeners = Worker::empty_listeners(); - let config = Worker::empty_file_config(); + let listeners = Listeners::default(); + let config = FileConfig::default(); let config = Worker::into_config(config); let state = ConfigState::new(); (config, listeners, state) diff --git a/e2e/src/tests/tests.rs b/e2e/src/tests/tests.rs index 65ae1cbfa..27e35737b 100644 --- a/e2e/src/tests/tests.rs +++ b/e2e/src/tests/tests.rs @@ -12,6 +12,7 @@ use sozu_command_lib::{ request::RequestType, ActivateListener, AddCertificate, CertificateAndKey, ListenerType, RemoveBackend, RequestHttpFrontend, }, + scm_socket::Listeners, state::ConfigState, }; @@ -171,9 +172,9 @@ pub fn try_backend_stop(nb_requests: usize, zombie: Option) -> State { let config = Worker::into_config(FileConfig { zombie_check_interval: zombie, - ..Worker::empty_file_config() + ..FileConfig::default() }); - let listeners = Worker::empty_listeners(); + let listeners = Listeners::default(); let state = ConfigState::new(); let (mut worker, mut backends) = setup_async_test( "BACKSTOP", diff --git a/lib/src/http.rs b/lib/src/http.rs index 3e52ad579..71504367a 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -1065,11 +1065,7 @@ pub fn start_http_worker( let server_scm_socket = ScmSocket::new(scm_server.as_raw_fd()).with_context(|| "Could not create scm socket")?; - if let Err(e) = client_scm_socket.send_listeners(&Listeners { - http: Vec::new(), - tls: Vec::new(), - tcp: Vec::new(), - }) { + if let Err(e) = client_scm_socket.send_listeners(&Listeners::default()) { error!("error sending empty listeners: {:?}", e); } diff --git a/lib/src/server.rs b/lib/src/server.rs index 803d070c6..fdfa60e96 100644 --- a/lib/src/server.rs +++ b/lib/src/server.rs @@ -28,7 +28,7 @@ use sozu_command::{ ready::Ready, request::WorkerRequest, response::{MessageId, WorkerResponse}, - scm_socket::{Listeners, ScmSocket}, + scm_socket::{Listeners, ScmSocket, ScmSocketError}, state::ConfigState, }; @@ -695,7 +695,13 @@ impl Server { } Some(RequestType::ReturnListenSockets(_)) => { info!("received ReturnListenSockets order"); - self.return_listen_sockets(); + match self.return_listen_sockets() { + Ok(_) => push_queue(WorkerResponse::ok(request.id.clone())), + Err(error) => push_queue(WorkerResponse::error( + request.id.clone(), + format!("Could not send listeners on scm socket: {error:?}"), + )), + } } _ => self.notify(request), }, @@ -1413,7 +1419,7 @@ impl Server { } /// Send all socket addresses and file descriptors of all proxies, via the scm socket - pub fn return_listen_sockets(&mut self) { + pub fn return_listen_sockets(&mut self) -> Result<(), ScmSocketError> { self.unblock_scm_socket(); let mut http_listeners = self.http.borrow_mut().give_back_listeners(); @@ -1464,6 +1470,7 @@ impl Server { self.block_scm_socket(); info!("sent default listeners: {:?}", res); + res } fn block_scm_socket(&mut self) { diff --git a/lib/src/tcp.rs b/lib/src/tcp.rs index 6732d3ff9..7e16999c0 100644 --- a/lib/src/tcp.rs +++ b/lib/src/tcp.rs @@ -1824,11 +1824,7 @@ mod tests { let server_scm_socket = ScmSocket::new(scm_server.as_raw_fd()).expect("Could not create scm socket"); client_scm_socket - .send_listeners(&Listeners { - http: Vec::new(), - tls: Vec::new(), - tcp: Vec::new(), - }) + .send_listeners(&Listeners::default()) .unwrap(); let server_config = server::ServerConfig {