From 67c00330a157a4459d66a7faa33aaf58043a81a3 Mon Sep 17 00:00:00 2001 From: Michael Vlach Date: Thu, 5 Sep 2024 21:45:25 +0200 Subject: [PATCH] [server] Add cluster election #1225 (#1230) * Update cluster_test.rs * working cluster * tweaks * Update agdb_server.yaml --- .github/workflows/agdb_server.yaml | 7 +- .vscode/tasks.json | 18 +- agdb/src/query.rs | 2 +- agdb_api/rust/src/api_error.rs | 2 +- agdb_api/rust/src/api_types.rs | 32 +- agdb_server/src/api.rs | 162 ++++++++++ agdb_server/src/app.rs | 3 +- agdb_server/src/cluster.rs | 309 ++++++++++++++++++- agdb_server/src/db_pool/server_db_storage.rs | 6 +- agdb_server/src/error_code.rs | 18 ++ agdb_server/src/logger.rs | 4 +- agdb_server/src/main.rs | 163 ---------- agdb_server/src/routes/cluster.rs | 197 ++++++++++-- agdb_server/src/server_error.rs | 2 +- agdb_server/src/server_state.rs | 6 + agdb_server/tests/cluster/cluster_test.rs | 138 +++++++++ agdb_server/tests/cluster/mod.rs | 1 + agdb_server/tests/routes/cluster_test.rs | 91 ------ agdb_server/tests/routes/mod.rs | 1 - agdb_server/tests/test_server.rs | 1 + 20 files changed, 852 insertions(+), 311 deletions(-) create mode 100644 agdb_server/tests/cluster/cluster_test.rs create mode 100644 agdb_server/tests/cluster/mod.rs delete mode 100644 agdb_server/tests/routes/cluster_test.rs diff --git a/.github/workflows/agdb_server.yaml b/.github/workflows/agdb_server.yaml index a0fe6bbc4..7289057d3 100644 --- a/.github/workflows/agdb_server.yaml +++ b/.github/workflows/agdb_server.yaml @@ -36,7 +36,10 @@ jobs: - uses: actions-rust-lang/setup-rust-toolchain@v1 - uses: taiki-e/install-action@cargo-llvm-cov - run: rustup component add llvm-tools-preview - - run: cargo llvm-cov --package agdb_server --all-features --ignore-filename-regex "agdb(.|..)src|agdb_derive|agdb_api|api.rs" --fail-uncovered-functions 0 --show-missing-lines + - run: cargo llvm-cov clean --workspace + - run: cargo llvm-cov --package agdb_server --all-features --no-report + - run: cargo llvm-cov --package agdb_server --all-features --no-report -- --ignored + - run: cargo llvm-cov report --ignore-filename-regex "agdb(.|..)src|agdb_derive|agdb_api|api.rs" --fail-uncovered-functions 0 --show-missing-lines agdb_server_test: runs-on: ubuntu-latest @@ -45,7 +48,7 @@ jobs: steps: - uses: actions/checkout@v4 - uses: actions-rust-lang/setup-rust-toolchain@v1 - - run: threshold=2; count=0; while cargo test --release --package agdb_server &> test.log && [[ "$count" != "$threshold" ]]; do count=$((count+1)); echo -n "."; done; cat test.log; echo "$count of $threshold tests run" + - run: threshold=2; count=0; while cargo test --release --package agdb_server -- --include-ignored &> test.log && [[ "$count" != "$threshold" ]]; do count=$((count+1)); echo -n "."; done; cat test.log; echo "$count of $threshold tests run" agdb_server_format: runs-on: ubuntu-latest diff --git a/.vscode/tasks.json b/.vscode/tasks.json index f7a5a9069..b7024a01c 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -27,17 +27,23 @@ }, { "label": "Coverage Server", - "type": "process", - "command": "cargo", - "args": ["llvm-cov", "--package", "agdb_server", "--all-features", "--ignore-filename-regex", "agdb(.|..)src|agdb_derive|agdb_api|api.rs", "--show-missing-lines"], + "type": "shell", + "command": "bash", + "args": [ + "-c", + "cargo llvm-cov clean --workspace && cargo llvm-cov -p agdb_server --all-features --no-report && cargo llvm-cov -p agdb_server --all-features --no-report -- --ignored && cargo llvm-cov report --ignore-filename-regex \"agdb(.|..)src|agdb_derive|agdb_api|api.rs\" --show-missing-lines" + ], "problemMatcher": ["$rustc"], "group": "build" }, { "label": "Coverage Server HTML", - "type": "process", - "command": "cargo", - "args": ["llvm-cov", "--package", "agdb_server", "--all-features", "--ignore-filename-regex", "agdb(.|..)src|agdb_derive|agdb_api|api.rs", "--html", "--open"], + "type": "shell", + "command": "bash", + "args": [ + "-c", + "cargo llvm-cov clean --workspace && cargo llvm-cov -p agdb_server --all-features --no-report && cargo llvm-cov -p agdb_server --all-features --no-report -- --ignored && cargo llvm-cov report --ignore-filename-regex \"agdb(.|..)src|agdb_derive|agdb_api|api.rs\" --html --open" + ], "problemMatcher": ["$rustc"], "group": "build" }, diff --git a/agdb/src/query.rs b/agdb/src/query.rs index 217013091..28aba5295 100644 --- a/agdb/src/query.rs +++ b/agdb/src/query.rs @@ -254,7 +254,7 @@ mod tests { .into(), QueryBuilder::remove().ids("node1").query().into(), ]; - format!("{:?}", queries); + let _ = format!("{:?}", queries); assert_eq!(queries, queries); } } diff --git a/agdb_api/rust/src/api_error.rs b/agdb_api/rust/src/api_error.rs index 97e604465..6ed8a9baa 100644 --- a/agdb_api/rust/src/api_error.rs +++ b/agdb_api/rust/src/api_error.rs @@ -40,7 +40,7 @@ mod tests { #[test] fn derived_from_debug() { - format!( + let _ = format!( "{:?}", AgdbApiError { status: 0, diff --git a/agdb_api/rust/src/api_types.rs b/agdb_api/rust/src/api_types.rs index a06ec30ff..47c96a3f3 100644 --- a/agdb_api/rust/src/api_types.rs +++ b/agdb_api/rust/src/api_types.rs @@ -54,7 +54,7 @@ pub struct ChangePassword { pub new_password: String, } -#[derive(Debug, Deserialize, Serialize, ToSchema, PartialEq)] +#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema, PartialEq)] pub struct ClusterStatus { pub address: String, pub status: bool, @@ -186,16 +186,16 @@ mod tests { #[test] fn derived_from_debug() { - format!("{:?}", DbType::Memory); - format!("{:?}", DbUserRole::Admin); - format!( + let _ = format!("{:?}", DbType::Memory); + let _ = format!("{:?}", DbUserRole::Admin); + let _ = format!( "{:?}", DbUser { user: "user".to_string(), role: DbUserRole::Admin } ); - format!( + let _ = format!( "{:?}", ServerDatabase { name: "db".to_string(), @@ -205,13 +205,13 @@ mod tests { backup: 0 } ); - format!( + let _ = format!( "{:?}", UserStatus { name: "user".to_string() } ); - format!( + let _ = format!( "{:?}", QueryAudit { timestamp: 0, @@ -219,8 +219,8 @@ mod tests { query: QueryType::SelectIndexes(SelectIndexesQuery {}) } ); - format!("{:?}", DbAudit(vec![])); - format!( + let _ = format!("{:?}", DbAudit(vec![])); + let _ = format!( "{:?}", ClusterStatus { address: "localhost".to_string(), @@ -314,4 +314,18 @@ mod tests { assert_eq!(status.cmp(&status), std::cmp::Ordering::Equal); } + + #[test] + fn derived_from_serde() { + let cs1 = ClusterStatus { + address: "localhost".to_string(), + status: true, + leader: false, + term: 0, + commit: 0, + }; + let data = serde_json::to_string(&cs1).unwrap(); + let cs2: ClusterStatus = serde_json::from_str(&data).unwrap(); + assert_eq!(cs1, cs2); + } } diff --git a/agdb_server/src/api.rs b/agdb_server/src/api.rs index a8b73fb7b..bc165b20f 100644 --- a/agdb_server/src/api.rs +++ b/agdb_server/src/api.rs @@ -124,3 +124,165 @@ impl Modify for BearerToken { } } } + +#[cfg(test)] +mod tests { + use super::*; + use agdb::Comparison; + use agdb::CountComparison; + use agdb::DbKeyOrder; + use agdb::QueryBuilder; + use agdb::QueryId; + use agdb::QueryType; + use agdb::UserValue; + use std::fs::File; + use std::io::Write; + + macro_rules! queries { + ($($x:expr),+ $(,)?) => { + { + let mut vec: Vec<(String, QueryType)> = Vec::new(); + $( + { + let mut as_string = stringify!($x).to_string(); + as_string.retain(|c| !c.is_whitespace()); + vec.push((as_string, $x.into())); + } + )* + vec + } + }; + } + + #[derive(Default, UserValue)] + struct T { + db_id: Option, + value1: String, + value2: i64, + } + + #[test] + fn openapi() { + let schema = Api::openapi().to_pretty_json().unwrap(); + let mut file = File::create("openapi.json").unwrap(); + file.write_all(schema.as_bytes()).unwrap(); + } + + #[test] + fn test_queries() { + #[rustfmt::skip] + let queries = queries![ +QueryBuilder::insert().aliases("a").ids(1).query(), +QueryBuilder::insert().aliases("a").ids("b").query(), +QueryBuilder::insert().aliases(vec!["a", "b"]).ids(vec![1, 2]).query(), +QueryBuilder::insert().edges().from(1).to(2).query(), +QueryBuilder::insert().edges().from("a").to("b").query(), +QueryBuilder::insert().edges().from("a").to(vec![1, 2]).query(), +QueryBuilder::insert().edges().from(vec![1, 2]).to(vec![2, 3]).query(), +QueryBuilder::insert().edges().from(vec![1, 2]).to(vec![2, 3]).each().query(), +QueryBuilder::insert().edges().from(vec![1, 2]).to(vec![2, 3]).each().values(vec![vec![("k", 1).into()], vec![("k", 2).into()]]).query(), +QueryBuilder::insert().edges().from(vec![1, 2]).to(vec![2, 3]).each().values_uniform(vec![("k", 1).into(), (1, 10).into()]).query(), +QueryBuilder::insert().edges().from("a").to(vec![1, 2]).values(vec![vec![("k", 1).into()], vec![("k", 2).into()]]).query(), +QueryBuilder::insert().edges().from("a").to(vec![1, 2]).values_uniform(vec![("k", "v").into(), (1, 10).into()]).query(), +QueryBuilder::insert().edges().from(QueryBuilder::search().from("a").where_().node().query()).to(QueryBuilder::search().from("b").where_().node().query()).query(), +QueryBuilder::insert().edges().from(QueryBuilder::search().from("a").where_().node().query()).to(QueryBuilder::search().from("b").where_().node().query()).values(vec![vec![("k", 1).into()], vec![("k", 2).into()]]).query(), +QueryBuilder::insert().edges().from(QueryBuilder::search().from("a").where_().node().query()).to(QueryBuilder::search().from("b").where_().node().query()).values_uniform(vec![("k", "v").into(), (1, 10).into()]).query(), +QueryBuilder::insert().edges().ids(-3).from(1).to(2).query(), +QueryBuilder::insert().edges().ids(vec![-3, -4]).from(1).to(2).query(), +QueryBuilder::insert().edges().ids(QueryBuilder::search().from(1).where_().edge().query()).from(1).to(2).query(), +QueryBuilder::insert().index("key").query(), +QueryBuilder::insert().nodes().count(2).query(), +QueryBuilder::insert().nodes().count(2).values_uniform(vec![("k", "v").into(), (1, 10).into()]).query(), +QueryBuilder::insert().nodes().aliases(vec!["a", "b"]).query(), +QueryBuilder::insert().nodes().aliases(vec!["a", "b"]).values(vec![vec![("k", 1).into()], vec![("k", 2).into()]]).query(), +QueryBuilder::insert().nodes().aliases(vec!["a", "b"]).values_uniform(vec![("k", "v").into(), (1, 10).into()]).query(), +QueryBuilder::insert().nodes().values(vec![vec![("k", 1).into()], vec![("k", 2).into()]]).query(), +QueryBuilder::insert().nodes().ids(1).count(1).query(), +QueryBuilder::insert().nodes().ids(vec![1, 2]).count(1).query(), +QueryBuilder::insert().nodes().ids("a").count(1).query(), +QueryBuilder::insert().nodes().ids("a").aliases("a").query(), +QueryBuilder::insert().nodes().ids(vec!["a", "b"]).count(1).query(), +QueryBuilder::insert().nodes().ids(vec![1, 2]).values(vec![vec![("k", "v").into()], vec![(1, 10).into()]]).query(), +QueryBuilder::insert().nodes().ids(vec![1, 2]).values_uniform(vec![("k", "v").into(), (1, 10).into()]).query(), +QueryBuilder::insert().nodes().ids(QueryBuilder::search().from(1).query()).count(1).query(), +QueryBuilder::insert().element(&T::default()).query(), +QueryBuilder::insert().elements(&[T::default(), T::default()]).query(), +QueryBuilder::insert().values(vec![vec![("k", "v").into(), (1, 10).into()], vec![("k", 2).into()]]).ids(vec![1, 2]).query(), +QueryBuilder::insert().values(vec![vec![("k", "v").into(), (1, 10).into()], vec![("k", 2).into()]]).ids(QueryBuilder::search().from("a").query()).query(), +QueryBuilder::insert().values_uniform(vec![("k", "v").into(), (1, 10).into()]).ids(vec![1, 2]).query(), +QueryBuilder::insert().values_uniform(vec![("k", "v").into(), (1, 10).into()]).ids(QueryBuilder::search().from("a").query()).query(), +QueryBuilder::remove().aliases("a").query(), +QueryBuilder::remove().aliases(vec!["a", "b"]).query(), +QueryBuilder::remove().ids(1).query(), +QueryBuilder::remove().ids("a").query(), +QueryBuilder::remove().ids(vec![1, 2]).query(), +QueryBuilder::remove().ids(vec!["a", "b"]).query(), +QueryBuilder::remove().ids(QueryBuilder::search().from("a").query()).query(), +QueryBuilder::remove().index("key").query(), +QueryBuilder::remove().values(vec!["k1".into(), "k2".into()]).ids(vec![1, 2]).query(), +QueryBuilder::remove().values(vec!["k1".into(), "k2".into()]).ids(QueryBuilder::search().from("a").query()).query(), +QueryBuilder::select().aliases().ids(vec![1, 2]).query(), +QueryBuilder::select().aliases().ids(QueryBuilder::search().from(1).query()).query(), +QueryBuilder::select().aliases().query(), +QueryBuilder::select().edge_count().ids(vec![1, 2]).query(), +QueryBuilder::select().edge_count_from().ids(vec![1, 2]).query(), +QueryBuilder::select().edge_count_to().ids(vec![1, 2]).query(), +QueryBuilder::select().ids("a").query(), +QueryBuilder::select().ids(vec![1, 2]).query(), +QueryBuilder::select().ids(QueryBuilder::search().from(1).query()).query(), +QueryBuilder::select().indexes().query(), +QueryBuilder::select().keys().ids("a").query(), +QueryBuilder::select().keys().ids(vec![1, 2]).query(), +QueryBuilder::select().keys().ids(QueryBuilder::search().from(1).query()).query(), +QueryBuilder::select().key_count().ids("a").query(), +QueryBuilder::select().key_count().ids(vec![1, 2]).query(), +QueryBuilder::select().key_count().ids(QueryBuilder::search().from(1).query()).query(), +QueryBuilder::select().node_count().query(), +QueryBuilder::select().values(vec!["k".into(), "k2".into()]).ids("a").query(), +QueryBuilder::select().values(vec!["k".into(), "k2".into()]).ids(vec![1, 2]).query(), +QueryBuilder::select().values(vec!["k".into(), "k2".into()]).ids(QueryBuilder::search().from(1).query()).query(), +QueryBuilder::search().from("a").query(), +QueryBuilder::search().to(1).query(), +QueryBuilder::search().from("a").to("b").query(), +QueryBuilder::search().breadth_first().from("a").query(), +QueryBuilder::search().depth_first().to(1).query(), +QueryBuilder::search().depth_first().from("a").query(), +QueryBuilder::search().elements().query(), +QueryBuilder::search().index("age").value(20).query(), +QueryBuilder::search().from(1).order_by(vec![DbKeyOrder::Desc("age".into()), DbKeyOrder::Asc("name".into())]).query(), +QueryBuilder::search().from(1).offset(10).query(), +QueryBuilder::search().from(1).limit(5).query(), +QueryBuilder::search().from(1).order_by(vec![DbKeyOrder::Desc("k".into())]).offset(10).query(), +QueryBuilder::search().from(1).order_by(vec![DbKeyOrder::Desc("k".into())]).limit(5).query(), +QueryBuilder::search().from(1).order_by(vec![DbKeyOrder::Desc("k".into())]).offset(10).limit(5).query(), +QueryBuilder::search().from(1).offset(10).limit(5).query(), +QueryBuilder::search().from(1).where_().distance(CountComparison::LessThan(3)).query(), +QueryBuilder::search().from(1).where_().edge().query(), +QueryBuilder::search().from(1).where_().edge_count(CountComparison::GreaterThan(2)).query(), +QueryBuilder::search().from(1).where_().edge_count_from(CountComparison::Equal(1)).query(), +QueryBuilder::search().from(1).where_().edge_count_to(CountComparison::NotEqual(1)).query(), +QueryBuilder::search().from(1).where_().node().query(), +QueryBuilder::search().from(1).where_().key("k").value(Comparison::Equal(1.into())).query(), +QueryBuilder::search().from(1).where_().keys(vec!["k1".into(), "k2".into()]).query(), +QueryBuilder::search().from(1).where_().not().keys(vec!["k1".into(), "k2".into()]).query(), +QueryBuilder::search().from(1).where_().ids(vec![1, 2]).query(), +QueryBuilder::search().from(1).where_().beyond().keys(vec!["k".into()]).query(), +QueryBuilder::search().from(1).where_().not().ids(vec![1, 2]).query(), +QueryBuilder::search().from(1).where_().not_beyond().ids("a").query(), +QueryBuilder::search().from(1).where_().node().or().edge().query(), +QueryBuilder::search().from(1).where_().node().and().distance(CountComparison::GreaterThanOrEqual(3)).query(), +QueryBuilder::search().from(1).where_().node().or().where_().edge().and().key("k").value(Comparison::Equal(1.into())).end_where().query(), +QueryBuilder::search().from(1).where_().node().or().where_().edge().and().key("k").value(Comparison::Contains(1.into())).end_where().query(), +QueryBuilder::search().from(1).where_().node().or().where_().edge().and().key("k").value(Comparison::Contains((vec![1, 2]).into())).end_where().query(), +QueryBuilder::search().from(1).order_by(vec![DbKeyOrder::Asc("k".into())]).where_().node().query(), +QueryBuilder::search().from(1).limit(1).where_().node().query(), +QueryBuilder::search().from(1).offset(1).where_().node().query(), +QueryBuilder::search().to(1).offset(1).query(), +QueryBuilder::search().to(1).limit(1).query(), +QueryBuilder::search().to(1).where_().node().query(), +QueryBuilder::search().to(1).order_by(vec![DbKeyOrder::Asc("k".into())]).where_().node().query() + ]; + + serde_json::to_writer_pretty(File::create("test_queries.json").unwrap(), &queries).unwrap(); + } +} diff --git a/agdb_server/src/app.rs b/agdb_server/src/app.rs index a4ca53e9c..4e73a9667 100644 --- a/agdb_server/src/app.rs +++ b/agdb_server/src/app.rs @@ -127,8 +127,9 @@ pub(crate) fn app( .route("/cluster/status", routing::get(routes::cluster::status)) .route( "/cluster/heartbeat", - routing::get(routes::cluster::heartbeat), + routing::post(routes::cluster::heartbeat), ) + .route("/cluster/vote", routing::get(routes::cluster::vote)) .route("/user/login", routing::post(routes::user::login)) .route("/user/logout", routing::post(routes::user::logout)) .route( diff --git a/agdb_server/src/cluster.rs b/agdb_server/src/cluster.rs index 75c32376b..7009ece7e 100644 --- a/agdb_server/src/cluster.rs +++ b/agdb_server/src/cluster.rs @@ -1,30 +1,149 @@ use crate::config::Config; +use crate::server_error::ServerError; use crate::server_error::ServerResult; use agdb::StableHash; -use agdb_api::AgdbApi; +use agdb_api::HttpClient; use agdb_api::ReqwestClient; +use axum::http::StatusCode; use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; use tokio::signal; use tokio::sync::broadcast; +use tokio::sync::RwLock; + +const TERM_TIMEOUT: Duration = Duration::from_secs(3); +const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(1); +const LOOP_TIMEOUT: Duration = Duration::from_millis(10); pub(crate) type Cluster = Arc; -type ClusterApi = AgdbApi; +type ClusterNode = Arc; + +pub(crate) struct ClusterNodeImpl { + client: ReqwestClient, + base_url: String, + token: Option, + index: usize, +} + +#[derive(Copy, Clone)] +pub(crate) enum ClusterState { + Leader, + LeaderElect, + Follower(usize), + Candidate, + Election, + Voted, +} + +pub(crate) struct ClusterData { + pub(crate) state: ClusterState, + pub(crate) timer: std::time::Instant, + pub(crate) term: u64, + pub(crate) voted: u64, + pub(crate) leader: Arc, +} -#[allow(dead_code)] pub(crate) struct ClusterImpl { - nodes: Vec, - cluster_hash: u64, + pub(crate) nodes: Vec, + pub(crate) cluster_hash: u64, + pub(crate) index: usize, + pub(crate) data: RwLock, +} + +impl ClusterImpl { + pub(crate) async fn leader(&self) -> Option { + match self.data.read().await.state { + ClusterState::Leader | ClusterState::LeaderElect => Some(self.index), + ClusterState::Follower(leader) => Some(leader), + ClusterState::Candidate | ClusterState::Election | ClusterState::Voted => None, + } + } +} + +impl ClusterNodeImpl { + fn new(address: &str, token: &str, index: usize) -> Self { + let base = if address.starts_with("http") || address.starts_with("https") { + address.to_string() + } else { + format!("http://{address}") + }; + + Self { + client: ReqwestClient::new(), + base_url: format!("{base}api/v1"), + token: Some(token.to_string()), + index, + } + } + + async fn heartbeat( + &self, + cluster_hash: u64, + term: u64, + leader: usize, + ) -> ServerResult<(u16, String)> { + self.client + .post::<(), String>( + &self.url(&format!( + "/cluster/heartbeat?cluster_hash={cluster_hash}&term={term}&leader={leader}" + )), + &None, + &self.token, + ) + .await + .map_err(|e| { + ServerError::new( + StatusCode::from_u16(e.status).unwrap_or(StatusCode::NOT_IMPLEMENTED), + &e.description, + ) + }) + } + + async fn vote( + &self, + cluster_hash: u64, + term: u64, + leader: usize, + ) -> ServerResult<(u16, String)> { + self.client + .get::( + &self.url(&format!( + "/cluster/vote?cluster_hash={cluster_hash}&term={term}&leader={leader}" + )), + &self.token, + ) + .await + .map_err(|e| { + ServerError::new( + StatusCode::from_u16(e.status).unwrap_or(StatusCode::NOT_IMPLEMENTED), + &e.description, + ) + }) + } + + fn url(&self, uri: &str) -> String { + format!("{}{uri}", self.base_url) + } } pub(crate) fn new(config: &Config) -> ServerResult { let mut nodes = vec![]; + let mut index = 0; - for node in &config.cluster { - if node != &config.address { - nodes.push(ClusterApi::new(ReqwestClient::new(), node.as_str())); + for (i, node) in config.cluster.iter().enumerate() { + if node == &config.address { + index = i; + } else { + nodes.push(ClusterNode::new(ClusterNodeImpl::new( + node.as_str(), + &config.cluster_token, + i, + ))); } } @@ -36,16 +155,186 @@ pub(crate) fn new(config: &Config) -> ServerResult { Ok(Cluster::new(ClusterImpl { nodes, cluster_hash, + index, + data: RwLock::new(ClusterData { + timer: Instant::now(), + state: ClusterState::Election, + term: 0, + voted: 0, + leader: Arc::new(AtomicBool::new(false)), + }), })) } async fn start_cluster(cluster: Cluster, shutdown_signal: Arc) -> ServerResult<()> { - if cluster.nodes.is_empty() { + if cluster.nodes.len() < 2 { return Ok(()); } while !shutdown_signal.load(Ordering::Relaxed) { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let state = cluster.data.read().await.state; + + match state { + ClusterState::LeaderElect => heartbeat(&cluster, shutdown_signal.clone()).await?, + ClusterState::Voted | ClusterState::Follower(_) => { + if cluster.data.read().await.timer.elapsed() > TERM_TIMEOUT { + let mut data = cluster.data.write().await; + data.state = ClusterState::Election; + data.timer = Instant::now(); + } + } + ClusterState::Election => { + if cluster.data.read().await.timer.elapsed() + >= Duration::from_secs(cluster.index as u64) + { + election(&cluster).await?; + } + } + ClusterState::Candidate | ClusterState::Leader => {} + } + + std::thread::sleep(LOOP_TIMEOUT); + } + + Ok(()) +} + +async fn heartbeat(cluster: &Cluster, shutdown_signal: Arc) -> ServerResult<()> { + cluster.data.write().await.state = ClusterState::Leader; + + let term = cluster.data.read().await.term; + let cluster_hash = cluster.cluster_hash; + let leader = cluster.index; + let is_leader = cluster.data.read().await.leader.clone(); + let cluster_index = cluster.index; + + for node in &cluster.nodes { + let node = node.clone(); + let is_leader = is_leader.clone(); + let shutdown_signal = shutdown_signal.clone(); + + tokio::spawn(async move { + while is_leader.load(Ordering::Relaxed) && !shutdown_signal.load(Ordering::Relaxed) { + match node.heartbeat(cluster_hash, term, leader).await { + Ok((status, message)) => { + if status != 200 { + tracing::warn!( + "[{cluster_index}] Heartbeat rejected by {}: ({}) {}", + node.index, + status, + message + ); + } + } + Err(e) => { + let message = format!( + "[{cluster_index}] Heartbeat error on node {}: ({}) {}", + node.index, e.status, e.description + ); + + if e.status.is_client_error() { + tracing::warn!(message); + } else { + tracing::error!(message); + } + } + } + + std::thread::sleep(HEARTBEAT_TIMEOUT); + } + }); + } + + Ok(()) +} + +async fn election(cluster: &Cluster) -> ServerResult<()> { + let timer = Instant::now(); + let election_term; + + { + let mut data = cluster.data.write().await; + election_term = data.term + 1; + data.state = ClusterState::Candidate; + data.voted = election_term; + } + + let cluster_hash = cluster.cluster_hash; + let index = cluster.index; + let quorum = (cluster.nodes.len() + 1) / 2 + 1; + + tracing::info!("[{index}] Starting election (cluster: {cluster_hash}, term: {election_term}, quorum: {quorum}/{})", cluster.nodes.len() + 1); + + let votes = Arc::new(AtomicUsize::new(1)); + let voted = Arc::new(AtomicUsize::new(1)); + + for node in &cluster.nodes { + let node = node.clone(); + let votes = votes.clone(); + let voted = voted.clone(); + let cluster = cluster.clone(); + + tokio::spawn(async move { + match node.vote(cluster_hash, election_term, index).await { + Ok(_) => { + tracing::info!( + "[{}] Vote for term {election_term} ACCEPTED by {}", + cluster.index, + node.index + ); + + if (votes.fetch_add(1, Ordering::Relaxed) + 1) == quorum { + tracing::info!( + "[{index}] Elected as leader for term {election_term} ({}ms)", + timer.elapsed().as_millis() + ); + + let mut data = cluster.data.write().await; + data.state = ClusterState::LeaderElect; + data.leader.store(true, Ordering::Relaxed); + data.term = election_term; + data.timer = Instant::now(); + } + } + Err(e) => { + if e.status.is_client_error() { + tracing::warn!( + "[{}] Vote for term {election_term} REJECTED by {}: ({}) {}", + cluster.index, + node.index, + e.status, + e.description + ); + } else { + tracing::error!( + "[{}] Vote for term {election_term} FAILED on {}: ({}) {}", + cluster.index, + node.index, + e.status, + e.description + ); + } + } + } + + if voted.fetch_add(1, Ordering::Relaxed) == cluster.nodes.len() { + let is_leader = cluster.data.read().await.leader.load(Ordering::Relaxed); + + if !is_leader { + tracing::warn!( + "[{index}] Election for term {election_term} failed - {}/{} (quorum: {quorum}/{}) ({}ms)", + votes.load(Ordering::Relaxed), + cluster.nodes.len() + 1, + cluster.nodes.len() + 1, + timer.elapsed().as_millis(), + ); + + let mut data = cluster.data.write().await; + data.state = ClusterState::Election; + data.timer = Instant::now(); + } + }; + }); } Ok(()) diff --git a/agdb_server/src/db_pool/server_db_storage.rs b/agdb_server/src/db_pool/server_db_storage.rs index dc5b76ed5..297a84b8c 100644 --- a/agdb_server/src/db_pool/server_db_storage.rs +++ b/agdb_server/src/db_pool/server_db_storage.rs @@ -146,7 +146,7 @@ mod tests { let _test_file_rename_dot = TestFile::new(".file_storage_rename.agdb"); let test_file_backup = TestFile::new("file_storage_backup.agdb"); let mut storage = ServerDbStorage::new(&format!("file:{}", test_file.0))?; - format!("{:?}", storage); + let _ = format!("{:?}", storage); storage.backup(&test_file_backup.0)?; assert!(std::path::Path::new(&test_file_backup.0).exists()); let other = storage.copy(&test_file_copy.0)?; @@ -174,7 +174,7 @@ mod tests { let _test_file_rename_dot = TestFile::new(".mapped_storage_rename.agdb"); let test_file2 = TestFile::new("mapped_storage_backup.agdb"); let mut storage = ServerDbStorage::new(&format!("mapped:{}", test_file.0))?; - format!("{:?}", storage); + let _ = format!("{:?}", storage); storage.backup(&test_file2.0)?; assert!(std::path::Path::new(&test_file2.0).exists()); let other = storage.copy(&test_file_copy.0)?; @@ -195,7 +195,7 @@ mod tests { #[test] fn memory_storage() -> anyhow::Result<()> { let mut storage = ServerDbStorage::new("memory:db_test.agdb")?; - format!("{:?}", storage); + let _ = format!("{:?}", storage); storage.backup("backup_test")?; let other = storage.copy("db_test_copy.agdb")?; assert_eq!(other.name(), "db_test_copy.agdb"); diff --git a/agdb_server/src/error_code.rs b/agdb_server/src/error_code.rs index 4b01a2ea0..a32bbca39 100644 --- a/agdb_server/src/error_code.rs +++ b/agdb_server/src/error_code.rs @@ -8,6 +8,10 @@ pub(crate) enum ErrorCode { DbExists, DbInvalid, QueryError, + ClusterHashMismatch, + TermMismatch, + LeaderExists, + AlreadyVoted, } impl From for StatusCode { @@ -25,6 +29,10 @@ impl From<&ErrorCode> for StatusCode { ErrorCode::DbExists => 465, ErrorCode::DbInvalid => 467, ErrorCode::QueryError => 470, + ErrorCode::ClusterHashMismatch => 480, + ErrorCode::TermMismatch => 481, + ErrorCode::LeaderExists => 482, + ErrorCode::AlreadyVoted => 483, }) .unwrap() } @@ -45,6 +53,10 @@ impl ErrorCode { ErrorCode::DbExists => "db already exists", ErrorCode::DbInvalid => "db invalid", ErrorCode::QueryError => "query error", + ErrorCode::ClusterHashMismatch => "cluster hash mismatch", + ErrorCode::TermMismatch => "term mismatch", + ErrorCode::LeaderExists => "leader exists", + ErrorCode::AlreadyVoted => "already voted", } } } @@ -64,5 +76,11 @@ mod tests { assert_eq!(ErrorCode::DbExists.as_str(), "db already exists"); assert_eq!(ErrorCode::DbInvalid.as_str(), "db invalid"); assert_eq!(ErrorCode::QueryError.as_str(), "query error"); + assert_eq!( + ErrorCode::ClusterHashMismatch.as_str(), + "cluster hash mismatch" + ); + assert_eq!(ErrorCode::TermMismatch.as_str(), "term mismatch"); + assert_eq!(ErrorCode::LeaderExists.as_str(), "leader exists"); } } diff --git a/agdb_server/src/logger.rs b/agdb_server/src/logger.rs index 684e58ffb..74c30429e 100644 --- a/agdb_server/src/logger.rs +++ b/agdb_server/src/logger.rs @@ -33,7 +33,9 @@ impl LogRecord { let message = serde_json::to_string(&self).unwrap_or_default(); match self.status { - ..=399 => tracing::info!(message), + ..=399 => { + tracing::info!(message) + } 400..=499 => tracing::warn!(message), 500.. => tracing::error!(message), } diff --git a/agdb_server/src/main.rs b/agdb_server/src/main.rs index 9743da2e4..40ff85236 100644 --- a/agdb_server/src/main.rs +++ b/agdb_server/src/main.rs @@ -39,166 +39,3 @@ async fn main() -> ServerResult { Ok(()) } - -#[cfg(test)] -mod tests { - use crate::api::Api; - use agdb::Comparison; - use agdb::CountComparison; - use agdb::DbKeyOrder; - use agdb::QueryBuilder; - use agdb::QueryId; - use agdb::QueryType; - use agdb::UserValue; - use std::fs::File; - use std::io::Write; - use utoipa::OpenApi; - - macro_rules! queries { - ($($x:expr),+ $(,)?) => { - { - let mut vec: Vec<(String, QueryType)> = Vec::new(); - $( - { - let mut as_string = stringify!($x).to_string(); - as_string.retain(|c| !c.is_whitespace()); - vec.push((as_string, $x.into())); - } - )* - vec - } - }; - } - - #[derive(Default, UserValue)] - struct T { - db_id: Option, - value1: String, - value2: i64, - } - - #[test] - fn openapi() { - let schema = Api::openapi().to_pretty_json().unwrap(); - let mut file = File::create("openapi.json").unwrap(); - file.write_all(schema.as_bytes()).unwrap(); - } - - #[test] - fn test_queries() { - #[rustfmt::skip] - let queries = queries![ -QueryBuilder::insert().aliases("a").ids(1).query(), -QueryBuilder::insert().aliases("a").ids("b").query(), -QueryBuilder::insert().aliases(vec!["a", "b"]).ids(vec![1, 2]).query(), -QueryBuilder::insert().edges().from(1).to(2).query(), -QueryBuilder::insert().edges().from("a").to("b").query(), -QueryBuilder::insert().edges().from("a").to(vec![1, 2]).query(), -QueryBuilder::insert().edges().from(vec![1, 2]).to(vec![2, 3]).query(), -QueryBuilder::insert().edges().from(vec![1, 2]).to(vec![2, 3]).each().query(), -QueryBuilder::insert().edges().from(vec![1, 2]).to(vec![2, 3]).each().values(vec![vec![("k", 1).into()], vec![("k", 2).into()]]).query(), -QueryBuilder::insert().edges().from(vec![1, 2]).to(vec![2, 3]).each().values_uniform(vec![("k", 1).into(), (1, 10).into()]).query(), -QueryBuilder::insert().edges().from("a").to(vec![1, 2]).values(vec![vec![("k", 1).into()], vec![("k", 2).into()]]).query(), -QueryBuilder::insert().edges().from("a").to(vec![1, 2]).values_uniform(vec![("k", "v").into(), (1, 10).into()]).query(), -QueryBuilder::insert().edges().from(QueryBuilder::search().from("a").where_().node().query()).to(QueryBuilder::search().from("b").where_().node().query()).query(), -QueryBuilder::insert().edges().from(QueryBuilder::search().from("a").where_().node().query()).to(QueryBuilder::search().from("b").where_().node().query()).values(vec![vec![("k", 1).into()], vec![("k", 2).into()]]).query(), -QueryBuilder::insert().edges().from(QueryBuilder::search().from("a").where_().node().query()).to(QueryBuilder::search().from("b").where_().node().query()).values_uniform(vec![("k", "v").into(), (1, 10).into()]).query(), -QueryBuilder::insert().edges().ids(-3).from(1).to(2).query(), -QueryBuilder::insert().edges().ids(vec![-3, -4]).from(1).to(2).query(), -QueryBuilder::insert().edges().ids(QueryBuilder::search().from(1).where_().edge().query()).from(1).to(2).query(), -QueryBuilder::insert().index("key").query(), -QueryBuilder::insert().nodes().count(2).query(), -QueryBuilder::insert().nodes().count(2).values_uniform(vec![("k", "v").into(), (1, 10).into()]).query(), -QueryBuilder::insert().nodes().aliases(vec!["a", "b"]).query(), -QueryBuilder::insert().nodes().aliases(vec!["a", "b"]).values(vec![vec![("k", 1).into()], vec![("k", 2).into()]]).query(), -QueryBuilder::insert().nodes().aliases(vec!["a", "b"]).values_uniform(vec![("k", "v").into(), (1, 10).into()]).query(), -QueryBuilder::insert().nodes().values(vec![vec![("k", 1).into()], vec![("k", 2).into()]]).query(), -QueryBuilder::insert().nodes().ids(1).count(1).query(), -QueryBuilder::insert().nodes().ids(vec![1, 2]).count(1).query(), -QueryBuilder::insert().nodes().ids("a").count(1).query(), -QueryBuilder::insert().nodes().ids("a").aliases("a").query(), -QueryBuilder::insert().nodes().ids(vec!["a", "b"]).count(1).query(), -QueryBuilder::insert().nodes().ids(vec![1, 2]).values(vec![vec![("k", "v").into()], vec![(1, 10).into()]]).query(), -QueryBuilder::insert().nodes().ids(vec![1, 2]).values_uniform(vec![("k", "v").into(), (1, 10).into()]).query(), -QueryBuilder::insert().nodes().ids(QueryBuilder::search().from(1).query()).count(1).query(), -QueryBuilder::insert().element(&T::default()).query(), -QueryBuilder::insert().elements(&[T::default(), T::default()]).query(), -QueryBuilder::insert().values(vec![vec![("k", "v").into(), (1, 10).into()], vec![("k", 2).into()]]).ids(vec![1, 2]).query(), -QueryBuilder::insert().values(vec![vec![("k", "v").into(), (1, 10).into()], vec![("k", 2).into()]]).ids(QueryBuilder::search().from("a").query()).query(), -QueryBuilder::insert().values_uniform(vec![("k", "v").into(), (1, 10).into()]).ids(vec![1, 2]).query(), -QueryBuilder::insert().values_uniform(vec![("k", "v").into(), (1, 10).into()]).ids(QueryBuilder::search().from("a").query()).query(), -QueryBuilder::remove().aliases("a").query(), -QueryBuilder::remove().aliases(vec!["a", "b"]).query(), -QueryBuilder::remove().ids(1).query(), -QueryBuilder::remove().ids("a").query(), -QueryBuilder::remove().ids(vec![1, 2]).query(), -QueryBuilder::remove().ids(vec!["a", "b"]).query(), -QueryBuilder::remove().ids(QueryBuilder::search().from("a").query()).query(), -QueryBuilder::remove().index("key").query(), -QueryBuilder::remove().values(vec!["k1".into(), "k2".into()]).ids(vec![1, 2]).query(), -QueryBuilder::remove().values(vec!["k1".into(), "k2".into()]).ids(QueryBuilder::search().from("a").query()).query(), -QueryBuilder::select().aliases().ids(vec![1, 2]).query(), -QueryBuilder::select().aliases().ids(QueryBuilder::search().from(1).query()).query(), -QueryBuilder::select().aliases().query(), -QueryBuilder::select().edge_count().ids(vec![1, 2]).query(), -QueryBuilder::select().edge_count_from().ids(vec![1, 2]).query(), -QueryBuilder::select().edge_count_to().ids(vec![1, 2]).query(), -QueryBuilder::select().ids("a").query(), -QueryBuilder::select().ids(vec![1, 2]).query(), -QueryBuilder::select().ids(QueryBuilder::search().from(1).query()).query(), -QueryBuilder::select().indexes().query(), -QueryBuilder::select().keys().ids("a").query(), -QueryBuilder::select().keys().ids(vec![1, 2]).query(), -QueryBuilder::select().keys().ids(QueryBuilder::search().from(1).query()).query(), -QueryBuilder::select().key_count().ids("a").query(), -QueryBuilder::select().key_count().ids(vec![1, 2]).query(), -QueryBuilder::select().key_count().ids(QueryBuilder::search().from(1).query()).query(), -QueryBuilder::select().node_count().query(), -QueryBuilder::select().values(vec!["k".into(), "k2".into()]).ids("a").query(), -QueryBuilder::select().values(vec!["k".into(), "k2".into()]).ids(vec![1, 2]).query(), -QueryBuilder::select().values(vec!["k".into(), "k2".into()]).ids(QueryBuilder::search().from(1).query()).query(), -QueryBuilder::search().from("a").query(), -QueryBuilder::search().to(1).query(), -QueryBuilder::search().from("a").to("b").query(), -QueryBuilder::search().breadth_first().from("a").query(), -QueryBuilder::search().depth_first().to(1).query(), -QueryBuilder::search().depth_first().from("a").query(), -QueryBuilder::search().elements().query(), -QueryBuilder::search().index("age").value(20).query(), -QueryBuilder::search().from(1).order_by(vec![DbKeyOrder::Desc("age".into()), DbKeyOrder::Asc("name".into())]).query(), -QueryBuilder::search().from(1).offset(10).query(), -QueryBuilder::search().from(1).limit(5).query(), -QueryBuilder::search().from(1).order_by(vec![DbKeyOrder::Desc("k".into())]).offset(10).query(), -QueryBuilder::search().from(1).order_by(vec![DbKeyOrder::Desc("k".into())]).limit(5).query(), -QueryBuilder::search().from(1).order_by(vec![DbKeyOrder::Desc("k".into())]).offset(10).limit(5).query(), -QueryBuilder::search().from(1).offset(10).limit(5).query(), -QueryBuilder::search().from(1).where_().distance(CountComparison::LessThan(3)).query(), -QueryBuilder::search().from(1).where_().edge().query(), -QueryBuilder::search().from(1).where_().edge_count(CountComparison::GreaterThan(2)).query(), -QueryBuilder::search().from(1).where_().edge_count_from(CountComparison::Equal(1)).query(), -QueryBuilder::search().from(1).where_().edge_count_to(CountComparison::NotEqual(1)).query(), -QueryBuilder::search().from(1).where_().node().query(), -QueryBuilder::search().from(1).where_().key("k").value(Comparison::Equal(1.into())).query(), -QueryBuilder::search().from(1).where_().keys(vec!["k1".into(), "k2".into()]).query(), -QueryBuilder::search().from(1).where_().not().keys(vec!["k1".into(), "k2".into()]).query(), -QueryBuilder::search().from(1).where_().ids(vec![1, 2]).query(), -QueryBuilder::search().from(1).where_().beyond().keys(vec!["k".into()]).query(), -QueryBuilder::search().from(1).where_().not().ids(vec![1, 2]).query(), -QueryBuilder::search().from(1).where_().not_beyond().ids("a").query(), -QueryBuilder::search().from(1).where_().node().or().edge().query(), -QueryBuilder::search().from(1).where_().node().and().distance(CountComparison::GreaterThanOrEqual(3)).query(), -QueryBuilder::search().from(1).where_().node().or().where_().edge().and().key("k").value(Comparison::Equal(1.into())).end_where().query(), -QueryBuilder::search().from(1).where_().node().or().where_().edge().and().key("k").value(Comparison::Contains(1.into())).end_where().query(), -QueryBuilder::search().from(1).where_().node().or().where_().edge().and().key("k").value(Comparison::Contains((vec![1, 2]).into())).end_where().query(), -QueryBuilder::search().from(1).order_by(vec![DbKeyOrder::Asc("k".into())]).where_().node().query(), -QueryBuilder::search().from(1).limit(1).where_().node().query(), -QueryBuilder::search().from(1).offset(1).where_().node().query(), -QueryBuilder::search().to(1).offset(1).query(), -QueryBuilder::search().to(1).limit(1).query(), -QueryBuilder::search().to(1).where_().node().query(), -QueryBuilder::search().to(1).order_by(vec![DbKeyOrder::Asc("k".into())]).where_().node().query() - ]; - - serde_json::to_writer_pretty(File::create("test_queries.json").unwrap(), &queries).unwrap(); - } -} diff --git a/agdb_server/src/routes/cluster.rs b/agdb_server/src/routes/cluster.rs index d41a11fd9..dfdeb3789 100644 --- a/agdb_server/src/routes/cluster.rs +++ b/agdb_server/src/routes/cluster.rs @@ -1,13 +1,131 @@ +use crate::cluster::Cluster; +use crate::cluster::ClusterState; use crate::config::Config; +use crate::error_code::ErrorCode; use crate::server_error::ServerResult; use crate::user_id::ClusterId; use agdb_api::ClusterStatus; +use axum::extract::Query; use axum::extract::State; use axum::http::StatusCode; use axum::Json; +use serde::Deserialize; +use std::sync::atomic::Ordering; +use std::time::Instant; -pub(crate) async fn heartbeat(_cluster_id: ClusterId) -> ServerResult { - Ok(StatusCode::OK) +#[derive(Deserialize)] +pub(crate) struct ClusterParams { + cluster_hash: u64, + term: u64, + leader: usize, +} + +pub(crate) async fn heartbeat( + _cluster_id: ClusterId, + State(cluster): State, + request: Query, +) -> ServerResult<(StatusCode, Json)> { + if cluster.cluster_hash != request.cluster_hash { + return Ok(( + ErrorCode::ClusterHashMismatch.into(), + Json(format!( + "Cluster hash mismatch: expected {}, got {}", + cluster.cluster_hash, request.cluster_hash + )), + )); + } + + let current_term = cluster.data.read().await.term; + + if request.term < current_term { + return Ok(( + ErrorCode::TermMismatch.into(), + Json(format!( + "Term mismatch: expected higher term than {}, got {}", + current_term, request.term + )), + )); + } + + let state = cluster.data.read().await.state; + + if let ClusterState::Follower(leader) = state { + if leader == request.leader && request.term == current_term { + return Ok((StatusCode::OK, Json(String::new()))); + } + } + + tracing::info!( + "[{}] Becoming a follower of node {}, term: {}", + cluster.index, + request.leader, + request.term + ); + + let mut data = cluster.data.write().await; + data.term = request.term; + data.state = ClusterState::Follower(request.leader); + data.leader.store(false, Ordering::Relaxed); + data.timer = Instant::now(); + + Ok((StatusCode::OK, Json(String::new()))) +} + +pub(crate) async fn vote( + _cluster_id: ClusterId, + State(cluster): State, + request: Query, +) -> ServerResult<(StatusCode, Json)> { + if cluster.cluster_hash != request.cluster_hash { + return Ok(( + ErrorCode::ClusterHashMismatch.into(), + Json(format!( + "Cluster hash mismatch: expected local ({}) == other ({})", + cluster.cluster_hash, request.cluster_hash + )), + )); + } + + let current_leader = cluster.leader().await; + + if let Some(leader) = current_leader { + return Ok(( + ErrorCode::LeaderExists.into(), + Json(format!("Leader already exists: node {}", leader)), + )); + } + + let current_term = cluster.data.read().await.term; + + if request.term <= current_term { + return Ok(( + ErrorCode::TermMismatch.into(), + Json(format!( + "Term mismatch: epxected current ({}) < requested ({})", + current_term, request.term + )), + )); + } + + let voted = cluster.data.read().await.voted; + + if request.term <= voted { + return Ok(( + ErrorCode::AlreadyVoted.into(), + Json(format!( + "Already voted: expected last vote ({voted}) < {}", + request.term + )), + )); + } + + let mut data = cluster.data.write().await; + data.state = ClusterState::Voted; + data.term = request.term; + data.voted = request.term; + data.timer = Instant::now(); + + Ok((StatusCode::OK, Json(String::new()))) } #[utoipa::path(get, @@ -20,31 +138,68 @@ pub(crate) async fn heartbeat(_cluster_id: ClusterId) -> ServerResult, + State(cluster): State, ) -> ServerResult<(StatusCode, Json>)> { - let mut statuses = Vec::with_capacity(config.cluster.len()); - let client = reqwest::Client::new(); + let mut statuses = vec![ClusterStatus::default(); config.cluster.len()]; + let mut tasks = Vec::new(); - for node in &config.cluster { - let status = if node == &config.address { - true - } else { + let leader; + let term; + + { + let data = cluster.data.read().await; + leader = cluster.leader().await; + term = data.term; + } + + for (index, node) in config.cluster.iter().enumerate() { + if index != cluster.index { + let address = node.as_str().to_string(); let url = format!("{}api/v1/status", node.as_str()); - let response = client - .get(&url) - .timeout(std::time::Duration::from_secs(1)) - .send() - .await; - response.is_ok() && response?.status().is_success() + + tasks.push(tokio::spawn(async move { + let client = reqwest::Client::new(); + + let response = client + .get(&url) + .timeout(std::time::Duration::from_secs(1)) + .send() + .await; + + let status = if let Ok(response) = response { + response.status().is_success() + } else { + false + }; + + ( + index, + ClusterStatus { + address, + status, + leader: status && Some(index) == leader, + term, + commit: 0, + }, + ) + })); + } else { + let status = &mut statuses[index]; + status.address = node.as_str().to_string(); + status.status = true; + status.leader = Some(index) == leader; + status.term = term; + status.commit = 0; }; + } - statuses.push(ClusterStatus { - address: node.as_str().to_string(), - status, - leader: false, - term: 0, - commit: 0, - }); + for task in tasks { + if let Ok((index, status)) = task.await { + statuses[index] = status; + } } + statuses.sort_by(|a, b| a.address.cmp(&b.address)); + Ok((StatusCode::OK, Json(statuses))) } diff --git a/agdb_server/src/server_error.rs b/agdb_server/src/server_error.rs index dc692c04a..8d0921b8e 100644 --- a/agdb_server/src/server_error.rs +++ b/agdb_server/src/server_error.rs @@ -41,7 +41,7 @@ mod tests { #[test] fn derived_from_debug() { - format!("{:?}", ServerError::from("my error")); + let _ = format!("{:?}", ServerError::from("my error")); } #[test] diff --git a/agdb_server/src/server_state.rs b/agdb_server/src/server_state.rs index bb69a0414..1a4b07305 100644 --- a/agdb_server/src/server_state.rs +++ b/agdb_server/src/server_state.rs @@ -30,3 +30,9 @@ impl FromRef for Config { input.config.clone() } } + +impl FromRef for Cluster { + fn from_ref(input: &ServerState) -> Self { + input.cluster.clone() + } +} diff --git a/agdb_server/tests/cluster/cluster_test.rs b/agdb_server/tests/cluster/cluster_test.rs new file mode 100644 index 000000000..0ded60de0 --- /dev/null +++ b/agdb_server/tests/cluster/cluster_test.rs @@ -0,0 +1,138 @@ +use crate::TestServer; +use crate::TestServerImpl; +use crate::ADMIN; +use crate::HOST; +use crate::SERVER_DATA_DIR; +use agdb_api::AgdbApi; +use agdb_api::ClusterStatus; +use agdb_api::ReqwestClient; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + +async fn wait_for_leader( + client: Arc>, +) -> anyhow::Result> { + let now = Instant::now(); + + while now.elapsed().as_millis() < 10000 { + let status = client.cluster_status().await?; + if status.1.iter().any(|s| s.leader) { + return Ok(status.1); + } + std::thread::sleep(std::time::Duration::from_millis(250)); + } + + Err(anyhow::anyhow!("Leader not found within 10 seconds")) +} + +async fn create_cluster( + nodes: usize, +) -> anyhow::Result>)>> { + let mut configs = Vec::with_capacity(nodes); + let mut cluster = Vec::with_capacity(nodes); + let mut servers = Vec::with_capacity(nodes); + let mut statuses = Vec::with_capacity(nodes); + + for _ in 0..nodes { + let port = TestServerImpl::next_port(); + let mut config = HashMap::<&str, serde_yaml::Value>::new(); + config.insert("bind", format!("{HOST}:{port}").into()); + config.insert("address", format!("http://{HOST}:{port}").into()); + config.insert("admin", ADMIN.into()); + config.insert("basepath", "".into()); + config.insert("data_dir", SERVER_DATA_DIR.into()); + config.insert("cluster_token", "test".into()); + + configs.push(config); + cluster.push(format!("http://{HOST}:{port}")); + } + + for config in &mut configs { + config.insert("cluster", cluster.clone().into()); + } + + for config in configs { + let server = TestServerImpl::with_config(config).await?; + let client = Arc::new(AgdbApi::new(ReqwestClient::new(), &server.address)); + servers.push((server, client)); + } + + for has_leader in servers + .iter() + .map(|(_, c)| tokio::spawn(wait_for_leader(c.clone()))) + { + statuses.push(has_leader.await??); + } + + for status in &statuses[1..] { + assert_eq!(statuses[0], *status); + } + + Ok(servers) +} + +#[tokio::test] +#[ignore] +async fn cluster_rebalance() -> anyhow::Result<()> { + let mut servers = create_cluster(3).await?; + + let mut client = AgdbApi::new(ReqwestClient::new(), &servers[0].0.address); + client.user_login(ADMIN, ADMIN).await?; + client.admin_shutdown().await?; + assert!(servers[0].0.process.wait()?.success()); + + let mut statuses = Vec::with_capacity(3); + + for has_leader in servers[1..] + .iter() + .map(|(_, c)| tokio::spawn(wait_for_leader(c.clone()))) + { + statuses.push(has_leader.await??); + } + + for status in &statuses[1..] { + assert_eq!(statuses[0], *status); + } + + Ok(()) +} + +#[tokio::test] +async fn cluster_status() { + let server = TestServer::new().await.unwrap(); + let (code, status) = server.api.cluster_status().await.unwrap(); + + assert_eq!(code, 200); + assert_eq!(status.len(), 0); +} + +#[tokio::test] +async fn heartbeat_no_token() -> anyhow::Result<()> { + let server = TestServer::new().await?; + let client = reqwest::Client::new(); + let status = client + .post(server.full_url("/cluster/heartbeat?cluster_hash=test&term=1&leader=0")) + .send() + .await? + .status(); + + assert_eq!(status, 401); + + Ok(()) +} + +#[tokio::test] +async fn vote_no_token() -> anyhow::Result<()> { + let server = TestServer::new().await?; + let client = reqwest::Client::new(); + let status = client + .get(server.full_url("/cluster/vote?cluster_hash=test&term=1&leader=0")) + .send() + .await? + .status(); + + assert_eq!(status, 401); + + Ok(()) +} diff --git a/agdb_server/tests/cluster/mod.rs b/agdb_server/tests/cluster/mod.rs new file mode 100644 index 000000000..595b75eaa --- /dev/null +++ b/agdb_server/tests/cluster/mod.rs @@ -0,0 +1 @@ +mod cluster_test; diff --git a/agdb_server/tests/routes/cluster_test.rs b/agdb_server/tests/routes/cluster_test.rs deleted file mode 100644 index cef8e537a..000000000 --- a/agdb_server/tests/routes/cluster_test.rs +++ /dev/null @@ -1,91 +0,0 @@ -use crate::TestServer; -use crate::TestServerImpl; -use crate::ADMIN; -use crate::HOST; -use crate::SERVER_DATA_DIR; -use agdb_api::AgdbApi; -use agdb_api::ReqwestClient; -use std::collections::HashMap; - -#[tokio::test] -async fn cluster_established() -> anyhow::Result<()> { - let port1 = TestServerImpl::next_port(); - let port2 = TestServerImpl::next_port(); - let port3 = TestServerImpl::next_port(); - let cluster = vec![ - format!("http://{HOST}:{port1}"), - format!("http://{HOST}:{port2}"), - format!("http://{HOST}:{port3}"), - ]; - - let mut config1 = HashMap::<&str, serde_yaml::Value>::new(); - config1.insert("bind", format!("{HOST}:{port1}").into()); - config1.insert("address", format!("http://{HOST}:{port1}").into()); - config1.insert("admin", ADMIN.into()); - config1.insert("basepath", "".into()); - config1.insert("data_dir", SERVER_DATA_DIR.into()); - config1.insert("cluster_token", "test".into()); - config1.insert("cluster", cluster.into()); - - let mut config2 = config1.clone(); - config2.insert("bind", format!("{HOST}:{port2}").into()); - config2.insert("address", format!("http://{HOST}:{port2}").into()); - - let mut config3 = config1.clone(); - config3.insert("bind", format!("{HOST}:{port3}").into()); - config3.insert("address", format!("http://{HOST}:{port3}").into()); - - let server1 = TestServerImpl::with_config(config1).await?; - let server2 = TestServerImpl::with_config(config2).await?; - let server3 = TestServerImpl::with_config(config3).await?; - - let client1 = AgdbApi::new(ReqwestClient::new(), &server1.address); - let client2 = AgdbApi::new(ReqwestClient::new(), &server2.address); - let client3 = AgdbApi::new(ReqwestClient::new(), &server3.address); - - let status1 = client1.cluster_status().await?; - let status2 = client2.cluster_status().await?; - let status3 = client3.cluster_status().await?; - - assert_eq!(status1.0, 200); - assert_eq!(status2.0, 200); - assert_eq!(status3.0, 200); - - assert_eq!(status1.1, status2.1); - assert_eq!(status1.1, status3.1); - - //assert!(status1.1.iter().any(|s| s.leader)); - - Ok(()) -} - -#[tokio::test] -async fn cluster_user() -> anyhow::Result<()> { - let server = TestServer::new().await?; - let client = reqwest::Client::new(); - let status = client - .get(server.full_url("/cluster/heartbeat")) - .bearer_auth("test") - .send() - .await? - .status(); - - assert_eq!(status, 200); - - Ok(()) -} - -#[tokio::test] -async fn cluster_user_no_token() -> anyhow::Result<()> { - let server = TestServer::new().await?; - let client = reqwest::Client::new(); - let status = client - .get(server.full_url("/cluster/heartbeat")) - .send() - .await? - .status(); - - assert_eq!(status, 401); - - Ok(()) -} diff --git a/agdb_server/tests/routes/mod.rs b/agdb_server/tests/routes/mod.rs index 0f4b06328..fd6a3342c 100644 --- a/agdb_server/tests/routes/mod.rs +++ b/agdb_server/tests/routes/mod.rs @@ -15,7 +15,6 @@ mod admin_user_add_test; mod admin_user_change_password_test; mod admin_user_list_test; mod admin_user_remove_test; -mod cluster_test; mod db_add_test; mod db_audit_test; mod db_backup_restore_test; diff --git a/agdb_server/tests/test_server.rs b/agdb_server/tests/test_server.rs index 54bf5a6e1..64b715076 100644 --- a/agdb_server/tests/test_server.rs +++ b/agdb_server/tests/test_server.rs @@ -1,3 +1,4 @@ +mod cluster; mod routes; use agdb_api::AgdbApi;