Skip to content

Commit

Permalink
Use OptionalClient in gatherer and tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Eloi DEMOLIS <eloi.demolis@clever-cloud.com>
  • Loading branch information
Wonshtrum committed Jan 15, 2024
1 parent 56f48b1 commit 20a8d87
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 286 deletions.
4 changes: 1 addition & 3 deletions bin/src/command_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use mio::net::UnixListener;

use sozu_command_lib::config::Config;

use crate::command_v2::{
requests::load_static_config, server::CommandHub, sessions::ClientSession,
};
use crate::command_v2::{requests::load_static_config, server::CommandHub};

mod requests;
pub mod server;
Expand Down
189 changes: 52 additions & 137 deletions bin/src/command_v2/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,11 @@ use sozu_command_lib::{
use sozu_lib::metrics::METRICS;

use crate::command_v2::{
server::{DefaultGatherer, Gatherer, GatheringTask, Server},
upgrade::upgrade_worker,
ClientSession,
server::{DefaultGatherer, Gatherer, GatheringTask, MessageClient, Server, WorkerId},
sessions::{ClientSession, OptionalClient},
upgrade::{upgrade_main, upgrade_worker},
};

use super::{server::WorkerId, upgrade::upgrade_main};

trait MessageClient {
fn finish_ok<T: Into<String>>(&mut self, content: Option<ResponseContent>, message: T);
fn finish_failure<T: Into<String>>(&mut self, message: T);
fn return_processing<T: Into<String>>(&mut self, message: T);
}

impl MessageClient for Option<&mut ClientSession> {
fn finish_ok<T: Into<String>>(&mut self, content: Option<ResponseContent>, message: T) {
match self {
None => info!("{}", message.into()),
Some(client) => client.finish_ok(content, message),
}
}
fn finish_failure<T: Into<String>>(&mut self, message: T) {
match self {
None => error!("{}", message.into()),
Some(client) => client.finish_failure(message),
}
}
fn return_processing<T: Into<String>>(&mut self, message: T) {
match self {
None => info!("{}", message.into()),
Some(client) => client.return_processing(message),
}
}
}

impl Server {
pub fn handle_client_request(&mut self, client: &mut ClientSession, request: Request) {
let request_type = request.request_type.unwrap();
Expand Down Expand Up @@ -105,57 +76,27 @@ impl Server {
RequestType::SoftStop(_) => stop(self, client, false),
RequestType::HardStop(_) => stop(self, client, true),
RequestType::Logging(logging_filter) => set_logging_level(self, client, logging_filter),
RequestType::ReturnListenSockets(_) => todo!(), // no need to implement this
RequestType::QueryCertificatesFromTheState(filters) => {
query_certificates_from_main(self, client, filters)
}

RequestType::CountRequests(_) => count_requests(self, client),

RequestType::ReturnListenSockets(_) => {} // This is only implemented by workers,
}
}

fn query_main(&self, request: &RequestType) -> Result<Option<ResponseContent>, ()> {
fn query_main(&self, request: RequestType) -> Result<Option<ResponseContent>, ()> {
match request {
// RequestType::SaveState(_) => todo!(),
// RequestType::LoadState(_) => todo!(),
// RequestType::ListWorkers(_) => todo!(),
// RequestType::ListFrontends(_) => todo!(),
// RequestType::ListListeners(_) => todo!(),
// RequestType::LaunchWorker(_) => todo!(),
// RequestType::UpgradeMain(_) => todo!(),
// RequestType::UpgradeWorker(_) => todo!(),
// RequestType::SubscribeEvents(_) => todo!(),
// RequestType::ReloadConfiguration(_) => todo!(),
// RequestType::Status(_) => todo!(),
// RequestType::AddCluster(_) => todo!(),
// RequestType::RemoveCluster(_) => todo!(),
// RequestType::AddHttpFrontend(_) => todo!(),
// RequestType::RemoveHttpFrontend(_) => todo!(),
// RequestType::AddHttpsFrontend(_) => todo!(),
// RequestType::RemoveHttpsFrontend(_) => todo!(),
// RequestType::AddCertificate(_) => todo!(),
// RequestType::ReplaceCertificate(_) => todo!(),
// RequestType::RemoveCertificate(_) => todo!(),
// RequestType::AddTcpFrontend(_) => todo!(),
// RequestType::RemoveTcpFrontend(_) => todo!(),
// RequestType::AddBackend(_) => todo!(),
// RequestType::RemoveBackend(_) => todo!(),
// RequestType::AddHttpListener(_) => todo!(),
// RequestType::AddHttpsListener(_) => todo!(),
// RequestType::AddTcpListener(_) => todo!(),
// RequestType::RemoveListener(_) => todo!(),
// RequestType::ActivateListener(_) => todo!(),
// RequestType::DeactivateListener(_) => todo!(),
RequestType::QueryClusterById(cluster_id) => Ok(Some(
ContentType::Clusters(ClusterInformations {
vec: self.state.cluster_state(cluster_id).into_iter().collect(),
vec: self.state.cluster_state(&cluster_id).into_iter().collect(),
})
.into(),
)),
RequestType::QueryClustersByDomain(domain) => {
let cluster_ids = self
.state
.get_cluster_ids_by_domain(domain.hostname.clone(), domain.path.clone());
.get_cluster_ids_by_domain(domain.hostname, domain.path);
let vec = cluster_ids
.iter()
.filter_map(|cluster_id| self.state.cluster_state(cluster_id))
Expand All @@ -170,15 +111,6 @@ impl Server {
})
.into(),
)),
// RequestType::QueryMetrics(_) => todo!(),
// RequestType::SoftStop(_) => todo!(),
// RequestType::HardStop(_) => todo!(),
// RequestType::ConfigureMetrics(_) => todo!(),
// RequestType::Logging(_) => todo!(),
// RequestType::ReturnListenSockets(_) => todo!(),
// RequestType::QueryCertificatesFromTheState(filters) => todo!(),
// RequestType::QueryCertificatesFromWorkers(_) => todo!(),
// RequestType::CountRequests(_) => todo!(),
_ => Ok(None),
}
}
Expand Down Expand Up @@ -223,7 +155,7 @@ pub fn list_frontend_command(
filters: FrontendFilters,
) {
let response = server
.query_main(&RequestType::ListFrontends(filters))
.query_main(RequestType::ListFrontends(filters))
.unwrap();
client.finish_ok(response, "Successfully listed frontends");
}
Expand Down Expand Up @@ -320,7 +252,7 @@ pub fn query_clusters(
Box::new(QueryClustersTask {
client_token: client.token,
gatherer: DefaultGatherer::default(),
main_process_response: server.query_main(&request_content).unwrap(),
main_process_response: server.query_main(request_content.clone()).unwrap(),
request_type: request_content,
}),
None,
Expand All @@ -339,8 +271,8 @@ impl GatheringTask for QueryClustersTask {
fn on_finish(
self: Box<Self>,
_server: &mut Server,
client: &mut ClientSession,
_timedout: bool,
client: &mut OptionalClient,
_timed_out: bool,
) {
let mut worker_responses: BTreeMap<String, ResponseContent> = self
.gatherer
Expand Down Expand Up @@ -378,11 +310,7 @@ struct LoadStaticConfigTask {
client_token: Option<Token>,
}

pub fn load_static_config(
server: &mut Server,
mut client: Option<&mut ClientSession>,
path: Option<&str>,
) {
pub fn load_static_config(server: &mut Server, mut client: OptionalClient, path: Option<&str>) {
let task_id = server.new_task(Box::new(LoadStaticConfigTask {
gatherer: DefaultGatherer::default(),
client_token: client.as_ref().map(|c| c.token),
Expand Down Expand Up @@ -425,8 +353,22 @@ pub fn load_static_config(
server.scatter_on(request, task_id, request_index, None);
}
}
impl LoadStaticConfigTask {
fn finish(self, server: &mut Server, mut client: Option<&mut ClientSession>, _timed_out: bool) {

impl GatheringTask for LoadStaticConfigTask {
fn client_token(&self) -> Option<Token> {
self.client_token
}

fn get_gatherer(&mut self) -> &mut dyn Gatherer {
&mut self.gatherer
}

fn on_finish(
self: Box<Self>,
server: &mut Server,
client: &mut OptionalClient,
_timed_out: bool,
) {
let mut messages = vec![];
for (worker_id, response) in self.gatherer.responses {
match response.status {
Expand Down Expand Up @@ -459,29 +401,6 @@ impl LoadStaticConfigTask {
}
}

impl GatheringTask for LoadStaticConfigTask {
fn client_token(&self) -> Option<Token> {
self.client_token
}

fn get_gatherer(&mut self) -> &mut dyn Gatherer {
&mut self.gatherer
}

fn on_finish(
self: Box<Self>,
server: &mut Server,
client: &mut ClientSession,
timed_out: bool,
) {
self.finish(server, Some(client), timed_out);
}

fn on_finish_no_client(self: Box<Self>, server: &mut Server, timed_out: bool) {
self.finish(server, None, timed_out);
}
}

// =========================================================
// Worker request

Expand Down Expand Up @@ -528,8 +447,8 @@ impl GatheringTask for WorkerTask {
fn on_finish(
self: Box<Self>,
_server: &mut Server,
client: &mut ClientSession,
_timedout: bool,
client: &mut OptionalClient,
_timed_out: bool,
) {
let mut messages = vec![];

Expand Down Expand Up @@ -587,8 +506,8 @@ impl GatheringTask for QueryMetricsTask {
fn on_finish(
self: Box<Self>,
_server: &mut Server,
client: &mut ClientSession,
_timedout: bool,
client: &mut OptionalClient,
_timed_out: bool,
) {
let main_metrics =
METRICS.with(|metrics| (*metrics.borrow_mut()).dump_local_proxy_metrics());
Expand Down Expand Up @@ -659,7 +578,7 @@ struct LoadStateTask {
path: String,
}

pub fn load_state(server: &mut Server, mut client: Option<&mut ClientSession>, path: &str) {
pub fn load_state(server: &mut Server, mut client: OptionalClient, path: &str) {
let mut file = match File::open(path) {
Ok(file) => file,
Err(err) if matches!(err.kind(), ErrorKind::NotFound) => {
Expand Down Expand Up @@ -740,8 +659,21 @@ pub fn load_state(server: &mut Server, mut client: Option<&mut ClientSession>, p
}
}

impl LoadStateTask {
fn finish(self, _server: &mut Server, mut client: Option<&mut ClientSession>, _timedout: bool) {
impl GatheringTask for LoadStateTask {
fn client_token(&self) -> Option<Token> {
self.client_token
}

fn get_gatherer(&mut self) -> &mut dyn Gatherer {
&mut self.gatherer
}

fn on_finish(
self: Box<Self>,
_server: &mut Server,
client: &mut OptionalClient,
_timed_out: bool,
) {
let DefaultGatherer { ok, errors, .. } = self.gatherer;
if errors == 0 {
client.finish_ok(
Expand All @@ -757,23 +689,6 @@ impl LoadStateTask {
}
}

impl GatheringTask for LoadStateTask {
fn client_token(&self) -> Option<Token> {
self.client_token
}

fn get_gatherer(&mut self) -> &mut dyn Gatherer {
&mut self.gatherer
}

fn on_finish(self: Box<Self>, server: &mut Server, client: &mut ClientSession, timedout: bool) {
self.finish(server, Some(client), timedout);
}
fn on_finish_no_client(self: Box<Self>, server: &mut Server, timedout: bool) {
self.finish(server, None, timedout);
}
}

// ==========================================================
// status

Expand Down Expand Up @@ -816,8 +731,8 @@ impl GatheringTask for StatusTask {
fn on_finish(
mut self: Box<Self>,
_server: &mut Server,
client: &mut ClientSession,
_timedout: bool,
client: &mut OptionalClient,
_timed_out: bool,
) {
for (worker_id, response) in self.gatherer.responses {
let new_run_state = match response.status {
Expand Down Expand Up @@ -881,7 +796,7 @@ impl GatheringTask for StopTask {
fn on_finish(
self: Box<Self>,
_server: &mut Server,
client: &mut ClientSession,
client: &mut OptionalClient,
_timed_out: bool,
) {
client.finish_ok(
Expand Down
Loading

0 comments on commit 20a8d87

Please sign in to comment.