diff --git a/agdb_server/src/cluster.rs b/agdb_server/src/cluster.rs index 7009ece7e..b7640253a 100644 --- a/agdb_server/src/cluster.rs +++ b/agdb_server/src/cluster.rs @@ -5,6 +5,8 @@ use agdb::StableHash; use agdb_api::HttpClient; use agdb_api::ReqwestClient; use axum::http::StatusCode; +use serde::Serialize; +use std::fmt::Display; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -55,6 +57,40 @@ pub(crate) struct ClusterImpl { pub(crate) data: RwLock, } +#[derive(Serialize)] +pub(crate) enum ClusterOperation { + Heartbeat, + Election, + Leader, + Follower, + Vote, +} + +#[derive(Serialize)] +pub(crate) enum ClusterResult { + Success, + Reject, + Failure, +} + +#[derive(Serialize)] +pub(crate) struct ClusterLog { + pub(crate) node: usize, + pub(crate) operation: ClusterOperation, + pub(crate) target_node: usize, + pub(crate) term: u64, + pub(crate) result: ClusterResult, + pub(crate) status: u16, + pub(crate) time: u128, + pub(crate) message: String, +} + +impl Display for ClusterLog { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&serde_json::to_string(self).unwrap_or_default()) + } +} + impl ClusterImpl { pub(crate) async fn leader(&self) -> Option { match self.data.read().await.state { @@ -215,27 +251,44 @@ async fn heartbeat(cluster: &Cluster, shutdown_signal: Arc) -> Serve tokio::spawn(async move { while is_leader.load(Ordering::Relaxed) && !shutdown_signal.load(Ordering::Relaxed) { + let timer = Instant::now(); + match node.heartbeat(cluster_hash, term, leader).await { Ok((status, message)) => { if status != 200 { - tracing::warn!( - "[{cluster_index}] Heartbeat rejected by {}: ({}) {}", - node.index, + let log = ClusterLog { + node: cluster_index, + operation: ClusterOperation::Heartbeat, + target_node: node.index, + term, + result: ClusterResult::Reject, status, - message - ); + time: timer.elapsed().as_micros(), + message, + }; + tracing::warn!("{log}"); } } Err(e) => { - let message = format!( - "[{cluster_index}] Heartbeat error on node {}: ({}) {}", - node.index, e.status, e.description - ); + let log = ClusterLog { + node: cluster_index, + operation: ClusterOperation::Heartbeat, + target_node: node.index, + term, + result: if e.status.is_client_error() { + ClusterResult::Reject + } else { + ClusterResult::Failure + }, + status: e.status.as_u16(), + time: timer.elapsed().as_micros(), + message: e.description, + }; if e.status.is_client_error() { - tracing::warn!(message); + tracing::warn!("{log}"); } else { - tracing::error!(message); + tracing::error!("{log}"); } } } @@ -261,9 +314,22 @@ async fn election(cluster: &Cluster) -> ServerResult<()> { let cluster_hash = cluster.cluster_hash; let index = cluster.index; - let quorum = (cluster.nodes.len() + 1) / 2 + 1; - - tracing::info!("[{index}] Starting election (cluster: {cluster_hash}, term: {election_term}, quorum: {quorum}/{})", cluster.nodes.len() + 1); + let cluster_len = cluster.nodes.len() + 1; + let quorum = cluster_len / 2 + 1; + + let log = ClusterLog { + node: index, + operation: ClusterOperation::Election, + target_node: index, + term: election_term, + result: ClusterResult::Success, + status: 0, + time: timer.elapsed().as_micros(), + message: format!( + "Starting election (cluster: {cluster_hash}, quorum: {quorum}/{cluster_len})" + ), + }; + tracing::info!("{log}"); let votes = Arc::new(AtomicUsize::new(1)); let voted = Arc::new(AtomicUsize::new(1)); @@ -276,18 +342,31 @@ async fn election(cluster: &Cluster) -> ServerResult<()> { tokio::spawn(async move { match node.vote(cluster_hash, election_term, index).await { - Ok(_) => { - tracing::info!( - "[{}] Vote for term {election_term} ACCEPTED by {}", - cluster.index, - node.index - ); + Ok((status, message)) => { + let log = ClusterLog { + node: cluster.index, + operation: ClusterOperation::Vote, + target_node: node.index, + term: election_term, + result: ClusterResult::Success, + time: timer.elapsed().as_micros(), + status, + message, + }; + tracing::info!("{log}"); if (votes.fetch_add(1, Ordering::Relaxed) + 1) == quorum { - tracing::info!( - "[{index}] Elected as leader for term {election_term} ({}ms)", - timer.elapsed().as_millis() - ); + let log = ClusterLog { + node: cluster.index, + operation: ClusterOperation::Leader, + target_node: cluster.index, + term: election_term, + result: ClusterResult::Success, + status: 0, + time: timer.elapsed().as_micros(), + message: "Elected as leader".to_string(), + }; + tracing::info!("{log}"); let mut data = cluster.data.write().await; data.state = ClusterState::LeaderElect; @@ -297,22 +376,25 @@ async fn election(cluster: &Cluster) -> ServerResult<()> { } } Err(e) => { + let log = ClusterLog { + node: cluster.index, + operation: ClusterOperation::Vote, + target_node: node.index, + term: election_term, + result: if e.status.is_client_error() { + ClusterResult::Reject + } else { + ClusterResult::Failure + }, + time: timer.elapsed().as_micros(), + status: e.status.as_u16(), + message: e.description, + }; + if e.status.is_client_error() { - tracing::warn!( - "[{}] Vote for term {election_term} REJECTED by {}: ({}) {}", - cluster.index, - node.index, - e.status, - e.description - ); + tracing::warn!("{log}"); } else { - tracing::error!( - "[{}] Vote for term {election_term} FAILED on {}: ({}) {}", - cluster.index, - node.index, - e.status, - e.description - ); + tracing::error!("{log}"); } } } @@ -321,13 +403,20 @@ async fn election(cluster: &Cluster) -> ServerResult<()> { let is_leader = cluster.data.read().await.leader.load(Ordering::Relaxed); if !is_leader { - tracing::warn!( - "[{index}] Election for term {election_term} failed - {}/{} (quorum: {quorum}/{}) ({}ms)", - votes.load(Ordering::Relaxed), - cluster.nodes.len() + 1, - cluster.nodes.len() + 1, - timer.elapsed().as_millis(), - ); + let log = ClusterLog { + node: cluster.index, + operation: ClusterOperation::Election, + target_node: cluster.index, + term: election_term, + result: ClusterResult::Reject, + status: 0, + time: timer.elapsed().as_micros(), + message: format!( + "Election for term {election_term} failed - {}/{cluster_len} (quorum: {quorum}/{cluster_len})", + votes.load(Ordering::Relaxed) + ), + }; + tracing::warn!("{log}"); let mut data = cluster.data.write().await; data.state = ClusterState::Election; @@ -340,6 +429,58 @@ async fn election(cluster: &Cluster) -> ServerResult<()> { Ok(()) } +pub(crate) async fn become_follower( + cluster: &Cluster, + term: u64, + leader: usize, +) -> ServerResult<()> { + let mut data = cluster.data.write().await; + data.term = term; + data.state = ClusterState::Follower(leader); + data.leader.store(false, Ordering::Relaxed); + + let time = data.timer; + data.timer = Instant::now(); + + let log = ClusterLog { + node: cluster.index, + operation: ClusterOperation::Follower, + target_node: leader, + term, + result: ClusterResult::Success, + status: StatusCode::OK.as_u16(), + time: time.elapsed().as_micros(), + message: "Becoming follower".to_string(), + }; + tracing::info!("{}", log); + + Ok(()) +} + +pub(crate) async fn vote(cluster: &Cluster, term: u64, leader: usize) -> ServerResult<()> { + let mut data = cluster.data.write().await; + data.state = ClusterState::Voted; + data.term = term; + data.voted = term; + + let time = data.timer; + data.timer = Instant::now(); + + let log = ClusterLog { + node: cluster.index, + operation: ClusterOperation::Vote, + target_node: leader, + term, + result: ClusterResult::Success, + status: StatusCode::OK.as_u16(), + time: time.elapsed().as_micros(), + message: "Vote cast".to_string(), + }; + tracing::info!("{}", log); + + Ok(()) +} + pub(crate) async fn start_with_shutdown( cluster: Cluster, mut shutdown_receiver: broadcast::Receiver<()>, diff --git a/agdb_server/src/config.rs b/agdb_server/src/config.rs index b824e6cd1..e91b0a287 100644 --- a/agdb_server/src/config.rs +++ b/agdb_server/src/config.rs @@ -22,11 +22,19 @@ pub(crate) struct ConfigImpl { pub(crate) data_dir: String, pub(crate) cluster_token: String, pub(crate) cluster: Vec, + #[serde(skip)] + pub(crate) cluster_node_id: usize, } pub(crate) fn new() -> ServerResult { if let Ok(content) = std::fs::read_to_string(CONFIG_FILE) { - let config = Config::new(serde_yaml::from_str(&content)?); + let mut config_impl: ConfigImpl = serde_yaml::from_str(&content)?; + config_impl.cluster_node_id = config_impl + .cluster + .iter() + .position(|x| x == &config_impl.address) + .unwrap_or(0); + let config = Config::new(config_impl); if !config.cluster.is_empty() && !config.cluster.contains(&config.address) { return Err(ServerError::from(format!( @@ -47,6 +55,7 @@ pub(crate) fn new() -> ServerResult { data_dir: "agdb_server_data".to_string(), cluster_token: "cluster".to_string(), cluster: vec![], + cluster_node_id: 0, }; std::fs::write(CONFIG_FILE, serde_yaml::to_string(&config)?)?; diff --git a/agdb_server/src/logger.rs b/agdb_server/src/logger.rs index 74c30429e..5c43632d7 100644 --- a/agdb_server/src/logger.rs +++ b/agdb_server/src/logger.rs @@ -16,6 +16,7 @@ use std::time::Instant; #[derive(Default, Serialize)] struct LogRecord { + node: usize, method: String, version: String, user: String, @@ -66,6 +67,7 @@ async fn request_log( log_record: &mut LogRecord, skip_body: bool, ) -> Result { + log_record.node = state.config.cluster_node_id; log_record.method = request.method().to_string(); log_record.uri = request.uri().to_string(); log_record.version = format!("{:?}", request.version()); @@ -166,6 +168,7 @@ mod tests { fn log_record(uri: &str, request_body: &str) -> LogRecord { LogRecord { + node: 0, method: "GET".to_string(), uri: uri.to_string(), version: "HTTP/1.1".to_string(), @@ -192,6 +195,7 @@ mod tests { #[test] fn log_error_test() { let log_record = LogRecord { + node: 0, method: "GET".to_string(), uri: "/".to_string(), version: "HTTP/1.1".to_string(), diff --git a/agdb_server/src/routes/cluster.rs b/agdb_server/src/routes/cluster.rs index dfdeb3789..edd6bbfb2 100644 --- a/agdb_server/src/routes/cluster.rs +++ b/agdb_server/src/routes/cluster.rs @@ -1,3 +1,4 @@ +use crate::cluster; use crate::cluster::Cluster; use crate::cluster::ClusterState; use crate::config::Config; @@ -10,8 +11,6 @@ use axum::extract::State; use axum::http::StatusCode; use axum::Json; use serde::Deserialize; -use std::sync::atomic::Ordering; -use std::time::Instant; #[derive(Deserialize)] pub(crate) struct ClusterParams { @@ -55,18 +54,7 @@ pub(crate) async fn heartbeat( } } - tracing::info!( - "[{}] Becoming a follower of node {}, term: {}", - cluster.index, - request.leader, - request.term - ); - - let mut data = cluster.data.write().await; - data.term = request.term; - data.state = ClusterState::Follower(request.leader); - data.leader.store(false, Ordering::Relaxed); - data.timer = Instant::now(); + cluster::become_follower(&cluster, request.term, request.leader).await?; Ok((StatusCode::OK, Json(String::new()))) } @@ -119,11 +107,7 @@ pub(crate) async fn vote( )); } - let mut data = cluster.data.write().await; - data.state = ClusterState::Voted; - data.term = request.term; - data.voted = request.term; - data.timer = Instant::now(); + cluster::vote(&cluster, request.term, request.leader).await?; Ok((StatusCode::OK, Json(String::new()))) } diff --git a/agdb_server/tests/cluster/cluster_test.rs b/agdb_server/tests/cluster/cluster_test.rs index b9190a7ea..9c2ae5d55 100644 --- a/agdb_server/tests/cluster/cluster_test.rs +++ b/agdb_server/tests/cluster/cluster_test.rs @@ -74,7 +74,7 @@ async fn create_cluster( } #[tokio::test] -#[ignore] +#[ignore = "Unstable on GitHub runners when run with coverage enabled"] async fn cluster_rebalance() -> anyhow::Result<()> { let mut servers = create_cluster(3).await?; diff --git a/agdb_web/pages/docs/references/server.en-US.md b/agdb_web/pages/docs/references/server.en-US.md index 12b5d1f3a..8531a274b 100644 --- a/agdb_web/pages/docs/references/server.en-US.md +++ b/agdb_web/pages/docs/references/server.en-US.md @@ -28,6 +28,7 @@ address: "localhost:3000" # address the incoming connections will come from basepath: "" # base path to append to the address in case the server is to be run behind a reverse proxy admin: admin # the admin user that will be created automatically for the server, the password will be the same as name (admin by default, recommended to change after startup) data_dir: agdb_server_data # directory to store user data +log_level: INFO # Options are: OFF, ERROR, WARN, INFO, DEBUG, TRACE ``` You can prepare it in advance in a file `agdb_server.yaml`. After the server database is created changes to the `admin` field will have no effect but the other settings can be changed later. All config changes require server restart to take effect.