Skip to content

Commit

Permalink
[api] Allow customizing reqwest client #1414 (#1415)
Browse files Browse the repository at this point in the history
allow customizing reqwest client
  • Loading branch information
michaelvlach authored Dec 29, 2024
1 parent 8cb5ce8 commit 83fb8d3
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 19 deletions.
4 changes: 4 additions & 0 deletions agdb_api/rust/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ impl ReqwestClient {
client: reqwest::Client::new(),
}
}

pub fn with_client(client: reqwest::Client) -> Self {
Self { client }
}
}

#[cfg(feature = "reqwest")]
Expand Down
14 changes: 9 additions & 5 deletions agdb_server/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl ClusterNodeImpl {
address: &str,
token: &str,
responses: UnboundedSender<(Request<ClusterAction>, Response)>,
) -> Self {
) -> ServerResult<Self> {
let base = if address.starts_with("http") || address.starts_with("https") {
address.to_string()
} else {
Expand All @@ -91,15 +91,19 @@ impl ClusterNodeImpl {

let (requests_sender, requests_receiver) = tokio::sync::mpsc::unbounded_channel();

Self {
client: ReqwestClient::new(),
Ok(Self {
client: ReqwestClient::with_client(
reqwest::Client::builder()
.connect_timeout(Duration::from_secs(60))
.build()?,
),
url: format!("{base}api/v1/cluster"),
base_url: base.trim_end_matches("/").to_string(),
token: Some(token.to_string()),
requests_sender,
requests_receiver: RwLock::new(requests_receiver),
responses,
}
})
}

fn bad_request(message: &str) -> AxumResponse {
Expand Down Expand Up @@ -194,7 +198,7 @@ pub(crate) async fn new(config: &Config, db: &ServerDb, db_pool: &DbPool) -> Ser
node.as_str(),
&config.cluster_token,
requests.clone(),
)));
)?));
}

Some(RwLock::new(responses))
Expand Down
30 changes: 27 additions & 3 deletions agdb_server/tests/routes/cluster_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,35 @@ use agdb_api::DbUserRole;
use agdb_api::ReqwestClient;
use assert_cmd::cargo::CommandCargoExt;
use std::process::Command;
use std::time::Duration;

#[tokio::test]
async fn rebalance() -> anyhow::Result<()> {
let mut servers = create_cluster(3).await?;
let mut leader = AgdbApi::new(ReqwestClient::new(), &servers[0].address);
let mut leader = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&servers[0].address,
);
leader.user_login(ADMIN, ADMIN).await?;
leader.admin_shutdown().await?;
assert!(servers[0].process.wait()?.success());

let mut statuses = Vec::with_capacity(servers.len());

for server in &servers[1..] {
let status = wait_for_leader(&AgdbApi::new(ReqwestClient::new(), &server.address)).await?;
let status = wait_for_leader(&AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
))
.await?;
statuses.push(status);
}

Expand All @@ -43,7 +59,15 @@ async fn rebalance() -> anyhow::Result<()> {
statuses.clear();

for server in &servers {
let status = wait_for_leader(&AgdbApi::new(ReqwestClient::new(), &server.address)).await?;
let status = wait_for_leader(&AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
))
.await?;
statuses.push(status);
}

Expand Down
55 changes: 49 additions & 6 deletions agdb_server/tests/routes/misc_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use reqwest::StatusCode;
use std::collections::HashMap;
use std::path::Path;
use std::process::Command;
use std::time::Duration;

#[tokio::test]
async fn missing() -> anyhow::Result<()> {
Expand Down Expand Up @@ -73,7 +74,14 @@ async fn openapi() -> anyhow::Result<()> {
#[tokio::test]
async fn config_reuse() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);
client.user_login(ADMIN, ADMIN).await?;
client.admin_shutdown().await?;
assert!(server.process.wait()?.success());
Expand All @@ -87,7 +95,14 @@ async fn config_reuse() -> anyhow::Result<()> {
#[tokio::test]
async fn db_list_after_shutdown() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);

{
client.user_login(ADMIN, ADMIN).await?;
Expand Down Expand Up @@ -117,7 +132,14 @@ async fn db_list_after_shutdown() -> anyhow::Result<()> {
#[tokio::test]
async fn db_list_after_shutdown_corrupted_data() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);

{
client.user_login(ADMIN, ADMIN).await?;
Expand Down Expand Up @@ -168,7 +190,14 @@ async fn basepath_test() -> anyhow::Result<()> {
#[tokio::test]
async fn location_change_after_restart() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);

{
client.user_login(ADMIN, ADMIN).await?;
Expand Down Expand Up @@ -213,7 +242,14 @@ async fn location_change_after_restart() -> anyhow::Result<()> {
#[tokio::test]
async fn reset_admin_password() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);

{
client.user_login(ADMIN, ADMIN).await?;
Expand Down Expand Up @@ -245,7 +281,14 @@ async fn reset_admin_password() -> anyhow::Result<()> {
#[tokio::test]
async fn memory_db_from_backup() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);
let owner = "user1";
let db = "db1";

Expand Down
40 changes: 35 additions & 5 deletions agdb_server/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,14 @@ impl TestServerImpl {
};

let mut process = Command::cargo_bin(BINARY)?.current_dir(&dir).spawn()?;
let api = AgdbApi::new(ReqwestClient::new(), &api_address);
let api = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&api_address,
);

for _ in 0..RETRY_ATTEMPS {
match api.status().await {
Expand Down Expand Up @@ -202,7 +209,14 @@ impl TestServer {
let server = server_guard.as_ref().unwrap();

Ok(Self {
api: AgdbApi::new(ReqwestClient::new(), &server.address),
api: AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
),
dir: server.dir.clone(),
data_dir: server.data_dir.clone(),
})
Expand Down Expand Up @@ -241,8 +255,17 @@ impl TestCluster {
.unwrap()
.0
.iter()
.map(|s| AgdbApi::new(ReqwestClient::new(), &s.address))
.collect(),
.map(|s| {
Ok(AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&s.address,
))
})
.collect::<anyhow::Result<Vec<AgdbApi<ReqwestClient>>>>()?,
};

cluster.apis[1].cluster_user_login(ADMIN, ADMIN).await?;
Expand Down Expand Up @@ -318,7 +341,14 @@ pub async fn create_cluster(nodes: usize) -> anyhow::Result<Vec<TestServerImpl>>
.map(|c| tokio::spawn(async move { TestServerImpl::with_config(c).await }))
{
let server = server.await??;
let api = AgdbApi::new(ReqwestClient::new(), &server.address);
let api = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);
servers.push((server, api));
}

Expand Down

0 comments on commit 83fb8d3

Please sign in to comment.