Skip to content

Commit

Permalink
complete upgrade_worker
Browse files Browse the repository at this point in the history
Signed-off-by: Eloi DEMOLIS <eloi.demolis@clever-cloud.com>
  • Loading branch information
Wonshtrum committed Jan 11, 2024
1 parent d77fa68 commit 1f8a991
Show file tree
Hide file tree
Showing 16 changed files with 233 additions and 159 deletions.
6 changes: 1 addition & 5 deletions bin/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,11 +617,7 @@ impl CommandServer {
incr!("worker_restart");

let new_worker_id = self.next_worker_id;
let listeners = Some(Listeners {
http: Vec::new(),
tls: Vec::new(),
tcp: Vec::new(),
});
let listeners = Some(Listeners::default());

let mut new_worker = start_worker(
new_worker_id,
Expand Down
6 changes: 1 addition & 5 deletions bin/src/command/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,7 @@ impl CommandServer {

info!(
"sending listeners: to the new worker: {:?}",
worker.scm_socket.send_listeners(&Listeners {
http: Vec::new(),
tls: Vec::new(),
tcp: Vec::new(),
})
worker.scm_socket.send_listeners(&Listeners::default())
);

let activate_requests = self.state.generate_activate_requests();
Expand Down
2 changes: 1 addition & 1 deletion bin/src/command_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn start_server(
let mut command_hub = CommandHub::new(unix_listener, config, executable_path)?;

for _ in 0..worker_count {
command_hub.launch_new_worker();
command_hub.launch_new_worker(None);
}

load_static_config(&mut command_hub.server, None, None);
Expand Down
12 changes: 3 additions & 9 deletions bin/src/command_v2/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,7 @@ fn set_logging_level(server: &mut Server, client: &mut ClientSession, logging_fi
);

// notify the workers too
worker_request(
server,
client,
RequestType::Logging(logging_filter.clone()).into(),
);
worker_request(server, client, RequestType::Logging(logging_filter));
}

fn subscribe_client_to_events(server: &mut Server, client: &mut ClientSession) {
Expand Down Expand Up @@ -389,7 +385,7 @@ pub fn load_static_config(
) {
let task_id = server.new_task(Box::new(LoadStaticConfigTask {
gatherer: DefaultGatherer::default(),
client_token: client.as_ref().map(|c| c.token.clone()),
client_token: client.as_ref().map(|c| c.token),
}));

let new_config;
Expand Down Expand Up @@ -852,15 +848,13 @@ impl GatheringTask for StatusTask {
struct StopTask {
pub client_token: Token,
pub gatherer: DefaultGatherer,
pub hardness: bool,
}

/// stop the main process and workers, true for hard stop
fn stop(server: &mut Server, client: &mut ClientSession, hardness: bool) {
let task = Box::new(StopTask {
client_token: client.token,
gatherer: DefaultGatherer::default(),
hardness,
});

server.stopping = true;
Expand All @@ -885,7 +879,7 @@ impl GatheringTask for StopTask {

fn on_finish(
self: Box<Self>,
server: &mut Server,
_server: &mut Server,
client: &mut ClientSession,
_timed_out: bool,
) {
Expand Down
41 changes: 24 additions & 17 deletions bin/src/command_v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl CommandHub {
);
if is_finished {
let task = self.tasks.remove(&task_id).unwrap();
debug!("Removed this task: {:#?}", task);
info!("Removed this task: {:#?}", task);
task.job.on_finish(&mut self.server, client, false);
self.handle_finish_task(task_id);
}
Expand All @@ -354,7 +354,7 @@ impl CommandHub {
.on_message_no_client(&mut self.server, worker_id, response);
if is_finished {
let task = self.tasks.remove(&task_id).unwrap();
debug!("Removed this task: {:#?}", task);
info!("Removed this task: {:#?}", task);
task.job.on_finish_no_client(&mut self.server, false);
self.handle_finish_task(task_id);
}
Expand Down Expand Up @@ -425,18 +425,14 @@ impl Server {

/// Forks main process into a new worker, register the worker in mio,
/// send a Status request to the worker
pub fn launch_new_worker(&mut self) -> &mut WorkerSession {
pub fn launch_new_worker(&mut self, listeners: Option<Listeners>) -> &mut WorkerSession {
let worker_id = self.next_worker_id();
let (worker_pid, main_to_worker_channel, main_to_worker_scm) = fork_main_into_worker(
&worker_id.to_string(),
&self.config,
self.executable_path.clone(),
&self.state,
Some(Listeners {
http: Vec::new(),
tls: Vec::new(),
tcp: Vec::new(),
}),
Some(listeners.unwrap_or_default()),
)
.expect("could not fork main into new worker");

Expand All @@ -448,7 +444,7 @@ impl Server {
);

worker_session.send(&WorkerRequest {
id: worker_id.to_string(),
id: format!("INITIAL-STATUS-{worker_id}"),
content: RequestType::Status(Status {}).into(),
});

Expand Down Expand Up @@ -510,7 +506,7 @@ impl Server {
) -> &mut WorkerSession {
let token = self.next_session_token();
self.register(token, &mut channel.sock);
let worker_session = self.workers.insert(
self.workers.insert(
token,
WorkerSession::new(channel, worker_id, pid, token, scm_socket),
);
Expand Down Expand Up @@ -556,10 +552,19 @@ impl Server {
};

for worker in self.workers.values_mut().filter(|w| {
(w.run_state == RunState::Running) && target.map(|id| id == w.id).unwrap_or(true)
target
.map(|id| id == w.id && w.run_state != RunState::Stopped)
.unwrap_or(w.run_state == RunState::Running)
}) {
worker_count += 1;
worker_request.id = format!("{}-{}-query-{}", task_id, request_id, worker.id);
worker_request.id = format!(
"{}-{}-{}-{}",
worker_request.content.short_name(),
worker.id,
task_id,
request_id,
);
info!("scattering on {} request: {:#?}", worker.id, worker_request);
worker.send(&worker_request);
self.in_flight.insert(worker_request.id, task_id);
}
Expand All @@ -578,10 +583,10 @@ impl Server {
/// - calls `close_worker`
/// - calls `start_worker` if needed
pub fn handle_worker_close(&mut self, token: &Token) {
let worker_id = match self.workers.get(token) {
let (worker_id, worker_run_state) = match self.workers.get(token) {
Some(worker) => {
info!("closing worker {:#?}", worker);
worker.id
(worker.id, worker.run_state)
}
None => {
error!("No worker exists with token {:?}", token);
Expand All @@ -591,9 +596,12 @@ impl Server {

self.close_worker(token);

if self.config.worker_automatic_restart && !self.stopping {
if self.config.worker_automatic_restart
&& worker_run_state == RunState::Running
&& !self.stopping
{
info!("Automatically restarting worker {}", worker_id);
self.launch_new_worker();
self.launch_new_worker(None);
}
}

Expand All @@ -605,7 +613,6 @@ impl Server {
return;
}
};
info!("closing worker {:?}", worker);

match kill(Pid::from_raw(worker.pid), Signal::SIGKILL) {
Ok(()) => info!("Worker {} was successfully killed", worker.id),
Expand Down
2 changes: 1 addition & 1 deletion bin/src/command_v2/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl WorkerSession {
RunState::Running | RunState::NotAnswering => RunState::NotAnswering,
};
WorkerInfo {
id: self.id as u32,
id: self.id,
pid: self.pid,
run_state: run_state as i32,
}
Expand Down
Loading

0 comments on commit 1f8a991

Please sign in to comment.