Skip to content

Commit

Permalink
rewrite test server shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelvlach committed Dec 29, 2024
1 parent 8b6515c commit 78588b0
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 69 deletions.
3 changes: 1 addition & 2 deletions agdb_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ agdb_api = { version = "0.9.2", path = "../agdb_api/rust", features = ["reqwest"
axum = { version = "0.7", features = ["http2"] }
axum-extra = { version = "0.9", features = ["typed-header"] }
http-body-util = "0.1"
reqwest = { version = "0.12", features = ["json", "blocking", "stream"] }
reqwest = { version = "0.12", features = ["json", "stream"] }
ring = "0.17"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand All @@ -33,5 +33,4 @@ utoipa-rapidoc = { version = "4", features = ["axum"] }
uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
assert_cmd = "2"
anyhow = "1"
9 changes: 2 additions & 7 deletions agdb_server/tests/routes/cluster_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use agdb_api::DbResource;
use agdb_api::DbType;
use agdb_api::DbUserRole;
use agdb_api::ReqwestClient;
use assert_cmd::cargo::CommandCargoExt;
use std::process::Command;
use std::time::Duration;

#[tokio::test]
Expand All @@ -29,7 +27,7 @@ async fn rebalance() -> anyhow::Result<()> {
);
leader.user_login(ADMIN, ADMIN).await?;
leader.admin_shutdown().await?;
assert!(servers[0].process.wait()?.success());
servers[0].wait().await?;

let mut statuses = Vec::with_capacity(servers.len() - 1);

Expand All @@ -50,10 +48,7 @@ async fn rebalance() -> anyhow::Result<()> {
assert_eq!(statuses[0], *status);
}

let dir = &servers[0].dir;
servers[0].process = Command::cargo_bin("agdb_server")?
.current_dir(dir)
.spawn()?;
servers[0].restart()?;
wait_for_ready(&leader).await?;

statuses.clear();
Expand Down
40 changes: 13 additions & 27 deletions agdb_server/tests/routes/misc_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ use agdb::QueryBuilder;
use agdb_api::AgdbApi;
use agdb_api::DbType;
use agdb_api::ReqwestClient;
use assert_cmd::cargo::CommandCargoExt;
use reqwest::StatusCode;
use std::collections::HashMap;
use std::path::Path;
use std::process::Command;
use std::time::Duration;

#[tokio::test]
Expand Down Expand Up @@ -84,10 +82,8 @@ async fn config_reuse() -> anyhow::Result<()> {
);
client.user_login(ADMIN, ADMIN).await?;
client.admin_shutdown().await?;
assert!(server.process.wait()?.success());
server.process = Command::cargo_bin("agdb_server")?
.current_dir(&server.dir)
.spawn()?;
server.wait().await?;
server.restart()?;
wait_for_ready(&client).await?;
Ok(())
}
Expand Down Expand Up @@ -115,12 +111,10 @@ async fn db_list_after_shutdown() -> anyhow::Result<()> {
client.user_logout().await?;
client.user_login(ADMIN, ADMIN).await?;
client.admin_shutdown().await?;
assert!(server.process.wait()?.success());
server.wait().await?;
}

server.process = Command::cargo_bin("agdb_server")?
.current_dir(&server.dir)
.spawn()?;
server.restart()?;
wait_for_ready(&client).await?;
client.user_login("userx", "userxpassword").await?;
let dbs = client.db_list().await?.1;
Expand Down Expand Up @@ -152,14 +146,12 @@ async fn db_list_after_shutdown_corrupted_data() -> anyhow::Result<()> {
client.user_logout().await?;
client.user_login(ADMIN, ADMIN).await?;
client.admin_shutdown().await?;
assert!(server.process.wait()?.success());
server.wait().await?;
}

std::fs::remove_dir_all(Path::new(&server.data_dir).join("userx"))?;

server.process = Command::cargo_bin("agdb_server")?
.current_dir(&server.dir)
.spawn()?;
server.restart()?;
wait_for_ready(&client).await?;
client.user_login("userx", "userxpassword").await?;
let dbs = client.db_list().await?.1;
Expand Down Expand Up @@ -217,12 +209,10 @@ async fn location_change_after_restart() -> anyhow::Result<()> {
client.user_logout().await?;
client.user_login(ADMIN, ADMIN).await?;
client.admin_shutdown().await?;
assert!(server.process.wait()?.success());
server.wait().await?;
}

server.process = Command::cargo_bin("agdb_server")?
.current_dir(&server.dir)
.spawn()?;
server.restart()?;
wait_for_ready(&client).await?;
client.user_login("user1", "userxpassword").await?;
let results = client
Expand Down Expand Up @@ -256,23 +246,21 @@ async fn reset_admin_password() -> anyhow::Result<()> {
client.admin_user_add("user1", "password123").await?;
client.user_change_password(ADMIN, "lostpassword").await?;
client.admin_shutdown().await?;
assert!(server.process.wait()?.success());
server.wait().await?;
}

let config_file = Path::new(&server.dir).join(CONFIG_FILE);
let new_config =
std::fs::read_to_string(&config_file)?.replace("admin: admin", "admin: NEW_ADMIN");
std::fs::write(config_file, new_config)?;

server.process = Command::cargo_bin("agdb_server")?
.current_dir(&server.dir)
.spawn()?;
server.restart()?;
wait_for_ready(&client).await?;

client.user_login("NEW_ADMIN", "NEW_ADMIN").await?;
let list = client.admin_user_list().await;
client.admin_shutdown().await?;
assert!(server.process.wait()?.success());
server.wait().await?;
assert_eq!(list?.1.len(), 3);

Ok(())
Expand Down Expand Up @@ -308,12 +296,10 @@ async fn memory_db_from_backup() -> anyhow::Result<()> {
assert_eq!(status, 201);
client.user_login(ADMIN, ADMIN).await?;
client.admin_shutdown().await?;
assert!(server.process.wait()?.success());
server.wait().await?;
}

server.process = Command::cargo_bin("agdb_server")?
.current_dir(&server.dir)
.spawn()?;
server.restart()?;
wait_for_ready(&client).await?;
client.user_login(owner, "password123").await?;

Expand Down
109 changes: 76 additions & 33 deletions agdb_server/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use agdb_api::AgdbApi;
use agdb_api::ClusterStatus;
use agdb_api::ReqwestClient;
use anyhow::anyhow;
use assert_cmd::prelude::*;
use std::collections::HashMap;
use std::path::Path;
use std::process::Child;
use std::process::Command;
use std::path::PathBuf;
use std::sync::atomic::AtomicU16;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use tokio::process::Child;
use tokio::process::Command;

const ADMIN: &str = "admin";
const BINARY: &str = "agdb_server";
Expand All @@ -36,6 +36,13 @@ static SERVER: std::sync::OnceLock<tokio::sync::RwLock<Option<TestServerImpl>>>
static CLUSTER: std::sync::OnceLock<tokio::sync::RwLock<Option<ClusterImpl>>> =
std::sync::OnceLock::new();

fn server_bin() -> anyhow::Result<PathBuf> {
let mut path = std::env::current_exe()?;
path.pop();
path.pop();
Ok(path.join(format!("{BINARY}{}", std::env::consts::EXE_SUFFIX)))
}

pub struct TestServer {
pub dir: String,
pub data_dir: String,
Expand All @@ -46,7 +53,7 @@ pub struct TestServerImpl {
pub dir: String,
pub data_dir: String,
pub address: String,
pub process: Child,
pub process: Option<Child>,
pub instances: u16,
}

Expand Down Expand Up @@ -87,7 +94,10 @@ impl TestServerImpl {
address.clone()
};

let mut process = Command::cargo_bin(BINARY)?.current_dir(&dir).spawn()?;
let mut process = Command::new(server_bin()?)
.current_dir(&dir)
.kill_on_drop(true)
.spawn()?;
let api = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
Expand All @@ -104,7 +114,7 @@ impl TestServerImpl {
dir,
data_dir,
address: api_address,
process,
process: Some(process),
instances: 1,
})
}
Expand Down Expand Up @@ -141,12 +151,28 @@ impl TestServerImpl {
PORT.fetch_add(1, Ordering::Relaxed) + std::process::id() as u16
}

fn shutdown_server(&mut self) -> anyhow::Result<()> {
if self.process.try_wait()?.is_some() {
return Ok(());
pub fn restart(&mut self) -> anyhow::Result<()> {
self.process = Some(
Command::new(server_bin()?)
.current_dir(&self.dir)
.kill_on_drop(true)
.spawn()?,
);
Ok(())
}

pub async fn wait(&mut self) -> anyhow::Result<()> {
if let Some(p) = self.process.as_mut() {
p.wait().await?;
}

let mut address = self.address.clone();
Ok(())
}

async fn shutdown_server(mut process: Child, mut address: String) -> anyhow::Result<()> {
if process.try_wait()?.is_some() {
return Ok(());
}

if !address.starts_with("http") {
address = format!("http://{}", address);
Expand All @@ -156,36 +182,34 @@ impl TestServerImpl {
admin.insert("username", ADMIN.to_string());
admin.insert("password", ADMIN.to_string());

std::thread::spawn(move || -> anyhow::Result<()> {
let client = reqwest::blocking::Client::new();
let token: String = client
.post(format!("{}/api/v1/user/login", address))
.json(&admin)
.timeout(Duration::from_secs(10))
.send()?
.json()?;

client
.post(format!("{}/api/v1/admin/shutdown", address))
.timeout(Duration::from_secs(10))
.bearer_auth(token)
.send()?;
Ok(())
})
.join()
.map_err(|e| anyhow!("{:?}", e))??;
let client = reqwest::Client::new();
let token: String = client
.post(format!("{}/api/v1/user/login", address))
.json(&admin)
.timeout(Duration::from_secs(10))
.send()
.await?
.json()
.await?;

client
.post(format!("{}/api/v1/admin/shutdown", address))
.timeout(Duration::from_secs(10))
.bearer_auth(token)
.send()
.await?;

for _ in 0..SHUTDOWN_RETRY_ATTEMPTS {
if self.process.try_wait()?.is_some() {
if process.try_wait()?.is_some() {
return Ok(());
}
std::thread::sleep(SHUTDOWN_RETRY_TIMEOUT);
}

self.process.kill()?;
process.kill().await?;

for _ in 0..SHUTDOWN_RETRY_ATTEMPTS {
if self.process.try_wait()?.is_some() {
if process.try_wait()?.is_some() {
return Ok(());
}
std::thread::sleep(SHUTDOWN_RETRY_TIMEOUT);
Expand Down Expand Up @@ -241,8 +265,27 @@ impl TestServer {

impl Drop for TestServerImpl {
fn drop(&mut self) {
let _ = Self::shutdown_server(self);
let _ = Self::remove_dir_if_exists(&self.dir);
static DROP_RUNTIME: std::sync::OnceLock<tokio::runtime::Runtime> =
std::sync::OnceLock::new();

if let Some(p) = self.process.take() {
let address = self.address.clone();
let dir = self.dir.clone();

let f = DROP_RUNTIME
.get_or_init(|| tokio::runtime::Runtime::new().unwrap())
.spawn(async move {
let _ = Self::shutdown_server(p, address)
.await
.inspect_err(|e| println!("{e:?}"));
});

while !f.is_finished() {
std::thread::sleep(SHUTDOWN_RETRY_TIMEOUT * 10);
}

let _ = Self::remove_dir_if_exists(&dir).inspect_err(|e| println!("{e:?}"));
}
}
}

Expand Down

0 comments on commit 78588b0

Please sign in to comment.