Skip to content

Commit

Permalink
draft: implement upgrade_main
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Jan 11, 2024
1 parent 1f8a991 commit 274a6c0
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 8 deletions.
2 changes: 1 addition & 1 deletion bin/src/command_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::command_v2::{
mod requests;
mod server;
mod sessions;
mod upgrade;
pub mod upgrade;

pub fn start_server(
config: Config,
Expand Down
4 changes: 3 additions & 1 deletion bin/src/command_v2/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use crate::command_v2::{
ClientSession,
};

use super::upgrade::upgrade_main;

trait MessageClient {
fn finish_ok<T: Into<String>>(&mut self, content: Option<ResponseContent>, message: T);
fn finish_failure<T: Into<String>>(&mut self, message: T);
Expand Down Expand Up @@ -64,7 +66,7 @@ impl Server {
RequestType::ListFrontends(inner) => list_frontend_command(self, client, inner),
RequestType::ListListeners(_) => list_listeners(self, client),
RequestType::LaunchWorker(_) => todo!(), // TODO: this is never used, for now
RequestType::UpgradeMain(_) => todo!(),
RequestType::UpgradeMain(_) => upgrade_main(self, client),
RequestType::UpgradeWorker(worker_id) => upgrade_worker(self, client, worker_id),
RequestType::SubscribeEvents(_) => subscribe_client_to_events(self, client),
RequestType::ReloadConfiguration(path) => {
Expand Down
44 changes: 41 additions & 3 deletions bin/src/command_v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
collections::{HashMap, HashSet},
fmt::Debug,
ops::{Deref, DerefMut},
os::fd::AsRawFd,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -30,12 +31,16 @@ use sozu_command_lib::{
};

use crate::{
command_v2::sessions::{
wants_to_tick, ClientResult, ClientSession, WorkerResult, WorkerSession,
command_v2::{
sessions::{wants_to_tick, ClientResult, ClientSession, WorkerResult, WorkerSession},
upgrade::UpgradeData,
},
util,
worker::fork_main_into_worker,
};

use super::sessions::SerializedWorkerSession;

pub type WorkerId = u32;
/// Gather messages and notifies when there are no more to read.
#[allow(unused)]
Expand Down Expand Up @@ -373,7 +378,7 @@ pub struct Server {
pub config: Config,
/// Sōzu clients that subscribed to events
pub event_subscribers: HashSet<Token>,
executable_path: String,
pub executable_path: String,
frontends_count: usize,
in_flight: HashMap<String, usize>,
next_client_id: u32, // TODO: create a ClientId type
Expand Down Expand Up @@ -620,4 +625,37 @@ impl Server {
}
worker.run_state = RunState::Stopped;
}

// TODO: return result, propagate errors
pub fn disable_cloexec_before_upgrade(&mut self) {
for (token, worker) in self.workers.iter_mut() {
if worker.run_state == RunState::Running {
let _ = util::disable_close_on_exec(worker.channel.fd()).map_err(|e| {
error!(
"could not disable close on exec for worker {}: {}",
worker.id, e
);
});
}
}
trace!(
"disabling cloexec on listener with file descriptor: {}",
self.unix_listener.as_raw_fd()
);
util::disable_close_on_exec(self.unix_listener.as_raw_fd())
.expect("please propagate this error");
}

pub fn generate_upgrade_data(&self) -> UpgradeData {
UpgradeData {
command_socket_fd: self.unix_listener.as_raw_fd(),
config: self.config.clone(),
workers: self
.workers
.values()
.map(SerializedWorkerSession::from)
.collect(),
state: self.state.clone(),
}
}
}
26 changes: 25 additions & 1 deletion bin/src/command_v2/sessions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{fmt::Debug, os::fd::{IntoRawFd, AsRawFd}};

Check warning on line 1 in bin/src/command_v2/sessions.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused import: `IntoRawFd`

use libc::pid_t;
use mio::Token;
Expand Down Expand Up @@ -190,6 +190,30 @@ impl WorkerSession {
}
}

/// meant to be passed to a new main process during an upgrade
#[derive(Debug)]
pub struct SerializedWorkerSession {
/// file descriptor of the UNIX channel
pub channel_fd: i32,
pub pid: pid_t,
pub id: WorkerId,
pub run_state: RunState,
/// file descriptor of the SCM socket
pub scm_fd: i32,
}

impl From<&WorkerSession> for SerializedWorkerSession {
fn from(value: &WorkerSession) -> Self {
Self {
channel_fd: value.channel.sock.as_raw_fd(),
pid: value.pid,
id: value.id,
run_state: value.run_state,
scm_fd: value.scm_socket.raw_fd(),
}
}
}

pub fn extract_messages<Tx, Rx>(channel: &mut Channel<Tx, Rx>) -> Vec<Rx>
where
Tx: Debug + Serialize,
Expand Down
51 changes: 50 additions & 1 deletion bin/src/command_v2/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use std::process::exit;

use mio::Token;
use sozu_command_lib::{
config::Config,
proto::command::{request::RequestType, ResponseStatus, ReturnListenSockets, SoftStop},
response::WorkerResponse,
state::ConfigState,
};

use crate::upgrade::fork_main_into_new_main;

use super::{
server::{Gatherer, GatheringTask, Server, WorkerId},
sessions::ClientSession,
sessions::{ClientSession, SerializedWorkerSession},
};

#[derive(Debug)]
Expand Down Expand Up @@ -228,3 +234,46 @@ impl Gatherer for UpgradeWorkerTask {
self.ok + self.errors >= self.expected_responses
}
}

//===============================================
// Upgrade the main process

#[derive(Debug)]
pub struct UpgradeData {
/// file descriptor of the unix command socket
pub command_socket_fd: i32,
pub config: Config,
/// JSON serialized workers
pub workers: Vec<SerializedWorkerSession>,
pub state: ConfigState,
}

pub fn upgrade_main(server: &mut Server, client: &mut ClientSession) {
server.disable_cloexec_before_upgrade();

client.return_processing("Upgrading...");

let upgrade_data = server.generate_upgrade_data();

let (new_main_pid, mut fork_confirmation_channel) =
fork_main_into_new_main(server.executable_path.clone(), upgrade_data)
.expect("Could not start a new main process");

if let Err(e) = fork_confirmation_channel.blocking() {
error!(
"Could not block the fork confirmation channel: {}. This is not normal, you may need to restart sozu",
e
);
}

let received_ok_from_new_process = fork_confirmation_channel
.read_message()
.expect("could not receive confirmation of the fork");
debug!("upgrade channel sent {:?}", received_ok_from_new_process);

if !received_ok_from_new_process {
client.finish_failure("Upgrade of main process failed: no feedback from the new main");
} else {
exit(0)
}
}
4 changes: 3 additions & 1 deletion bin/src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use sozu_command_lib::{
request::WorkerRequest, state::ConfigState,
};

use crate::{command::CommandServer, util, worker::Worker};
use crate::{command::CommandServer, util, worker::Worker, command_v2::upgrade::UpgradeData};

Check warning on line 23 in bin/src/upgrade.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused import: `worker::Worker`

#[derive(Deserialize, Serialize, Debug)]
pub struct SerializedWorker {
Expand All @@ -32,6 +32,7 @@ pub struct SerializedWorker {
pub scm: i32,
}

/*
impl SerializedWorker {
pub fn from_worker(worker: &Worker) -> SerializedWorker {
SerializedWorker {
Expand All @@ -56,6 +57,7 @@ pub struct UpgradeData {
pub state: ConfigState,
pub next_id: u32,
}
*/

/// unix-forks the main process
///
Expand Down
2 changes: 2 additions & 0 deletions bin/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use nix::fcntl::{fcntl, FcntlArg, FdFlag};
use sozu::metrics;
use sozu_command_lib::config::Config;

// TODO: create error type with thiserror for this module

pub fn enable_close_on_exec(fd: RawFd) -> Result<i32, anyhow::Error> {
let file_descriptor =
fcntl(fd, FcntlArg::F_GETFD).with_context(|| "could not get file descriptor flags")?;
Expand Down

0 comments on commit 274a6c0

Please sign in to comment.