Skip to content

Commit

Permalink
refac: centralize http clients in the proto crate (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
lffg authored Jun 20, 2024
1 parent b26072a commit 06c69da
Show file tree
Hide file tree
Showing 26 changed files with 296 additions and 108 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bollard = "0.16.1"
clap = { version = "4.5", features = ["derive"] }
chrono = { version = "0.4.38", default-features = false, features = [
"std",
"now",
"serde",
] }
eyre = "0.6"
Expand Down Expand Up @@ -46,3 +47,4 @@ module_name_repetitions = "allow"
cast_precision_loss = "allow"
unused_async = "allow"
enum_glob_use = "allow"
missing_errors_doc = "allow"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ cargo run -p ctl
Worker:

```
cargo run -p worker -- --controller-addr '127.0.0.1:3000'
cargo run -p worker -- --controller-addr '127.0.0.1'
```
29 changes: 29 additions & 0 deletions ctl/src/http/deployer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use axum::{extract::State, Json};
use proto::ctl::deployer::{
DeployServiceReq, DeployServiceRes, ReportDeployInstanceStatusReq,
ReportDeployInstanceStatusRes, TerminateServiceReq, TerminateServiceRes,
};

use crate::http::HttpState;

pub async fn report_instance_status(
State(state): State<HttpState>,
Json(_payload): Json<ReportDeployInstanceStatusReq>,
) -> Json<ReportDeployInstanceStatusRes> {
_ = state.discovery;
todo!();
}

pub async fn deploy_service(
State(_state): State<HttpState>,
Json(_payload): Json<DeployServiceReq>,
) -> Json<DeployServiceRes> {
todo!();
}

pub async fn terminate_service(
State(_state): State<HttpState>,
Json(_payload): Json<TerminateServiceReq>,
) -> Json<TerminateServiceRes> {
todo!();
}
12 changes: 0 additions & 12 deletions ctl/src/http/deployer/mod.rs

This file was deleted.

10 changes: 8 additions & 2 deletions ctl/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ pub struct HttpState {

pub async fn run_server(state: HttpState) {
let app = Router::new()
.route("/worker/metrics", post(worker::push_metrics))
.route("/deploy", post(deployer::deploy))
.route("/worker/push-metrics", post(worker::push_metrics))
.nest(
"/deployer",
Router::new()
.route("/deploy-service", post(deployer::deploy_service))
.route("/terminate-service", post(deployer::terminate_service))
.route("/status", post(deployer::report_instance_status)),
)
.with_state(state);

let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
Expand Down
8 changes: 8 additions & 0 deletions ctl/src/http/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use axum::Json;
use proto::{common::node::Metrics, ctl::worker::PushWorkerMetricsRes};
use tracing::info;

pub async fn push_metrics(Json(payload): Json<Metrics>) -> Json<PushWorkerMetricsRes> {
info!("{payload:#?}");
todo!()
}
7 changes: 0 additions & 7 deletions ctl/src/http/worker/mod.rs

This file was deleted.

4 changes: 3 additions & 1 deletion proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ workspace = true

[dependencies]
chrono.workspace = true
serde.workspace = true
eyre.workspace = true
reqwest.workspace = true
serde_json.workspace = true
serde.workspace = true
uuid.workspace = true
92 changes: 92 additions & 0 deletions proto/src/clients/ctl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::{net::IpAddr, sync::Arc};

use chrono::{DateTime, Utc};

use crate::{
clients::BaseClient,
common::{
instance::{self, InstanceId},
node::Metrics,
service::{ServiceId, ServiceSpec},
},
ctl::{
deployer::{
DeployServiceReq, DeployServiceRes, RedeploymentPolicy, ReportDeployInstanceStatusReq,
ReportDeployInstanceStatusRes, TerminateServiceReq, TerminateServiceRes,
},
worker::{PushWorkerMetricsReq, PushWorkerMetricsRes},
},
well_known::CTL_HTTP_PORT,
};

#[derive(Clone)]
pub struct CtlClient {
base_url: Arc<str>,
client: BaseClient,
}

impl CtlClient {
#[must_use]
pub fn new(ctl_addr: IpAddr) -> Self {
let base_url = format!("http://{ctl_addr}:{CTL_HTTP_PORT}")
.into_boxed_str()
.into();
let client = BaseClient::new();
CtlClient { base_url, client }
}

fn url(&self, path: &str) -> String {
assert!(path.starts_with('/'));
format!("{base}{path}", base = self.base_url)
}

pub async fn push_metrics(
&self,
metrics: Metrics,
recorded_at: DateTime<Utc>,
) -> eyre::Result<PushWorkerMetricsRes> {
let body = PushWorkerMetricsReq {
metrics,
recorded_at,
};
self.client
.send(self.url("/worker/push-metrics"), &body)
.await
}

pub async fn deploy_service(
&self,
service_spec: ServiceSpec,
redeployment_policy: RedeploymentPolicy,
) -> eyre::Result<DeployServiceRes> {
let body = DeployServiceReq {
service_spec,
redeployment_policy,
};
self.client
.send(self.url("/deployer/deploy-service"), &body)
.await
}

pub async fn terminate_service(
&self,
service_id: ServiceId,
) -> eyre::Result<TerminateServiceRes> {
let body = TerminateServiceReq { service_id };
self.client
.send(self.url("/deployer/terminate-service"), &body)
.await
}

pub async fn report_instance_status(
&self,
instance_id: InstanceId,
status: instance::Status,
) -> eyre::Result<ReportDeployInstanceStatusRes> {
let body = ReportDeployInstanceStatusReq {
instance_id,
status,
};
self.client.send(self.url("/deployer/status"), &body).await
}
}
41 changes: 41 additions & 0 deletions proto/src/clients/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
mod ctl;
pub use ctl::CtlClient;

mod worker;
use eyre::Context as _;
use serde::{de::DeserializeOwned, Serialize};
pub use worker::WorkerClient;

#[derive(Clone)]
pub struct BaseClient {
client: reqwest::Client,
}

impl BaseClient {
#[must_use]
fn new() -> Self {
let client = reqwest::Client::new();
Self { client }
}

/// Sends a request to the given path, on the given worker.
///
/// Paths must start with a `/`.
async fn send<Req, Res>(&self, url: impl AsRef<str>, body: &Req) -> eyre::Result<Res>
where
Req: Serialize,
Res: DeserializeOwned,
{
let res = self
.client
.post(url.as_ref())
.json(body)
.send()
.await
.wrap_err("failed to send request to worker")?
.json::<Res>()
.await
.wrap_err("failed to parse response from worker")?;
Ok(res)
}
}
52 changes: 52 additions & 0 deletions proto/src/clients/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::net::IpAddr;

use crate::{
clients::BaseClient,
common::instance::{InstanceId, InstanceSpec},
well_known::WORKER_HTTP_PORT,
worker::runner::{
DeployInstanceReq, DeployInstanceRes, TerminateInstanceReq, TerminateInstanceRes,
},
};

#[derive(Clone)]
pub struct WorkerClient {
client: BaseClient,
}

impl WorkerClient {
#[must_use]
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let client = BaseClient::new();
WorkerClient { client }
}

#[allow(clippy::unused_self)]
fn url(&self, worker: IpAddr, path: &str) -> String {
assert!(path.starts_with('/'));
format!("http://{worker}:{WORKER_HTTP_PORT}{path}")
}

pub async fn deploy_instance(
&self,
worker: IpAddr,
instance_spec: InstanceSpec,
) -> eyre::Result<DeployInstanceRes> {
let body = DeployInstanceReq { instance_spec };
self.client
.send(self.url(worker, "/runner/deploy-instance"), &body)
.await
}

pub async fn terminate_instance(
&self,
worker: IpAddr,
instance_id: InstanceId,
) -> eyre::Result<TerminateInstanceRes> {
let body = TerminateInstanceReq { instance_id };
self.client
.send(self.url(worker, "/runner/terminate-instance"), &body)
.await
}
}
8 changes: 4 additions & 4 deletions proto/src/ctl/deployer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct DeploymentId(pub Uuid);

/// Starts a new deploy in the system.
#[derive(Debug, Serialize, Deserialize)]
pub struct DeployReq {
pub struct DeployServiceReq {
pub service_spec: ServiceSpec,
pub redeployment_policy: RedeploymentPolicy,
}
Expand All @@ -32,20 +32,20 @@ pub enum RedeploymentPolicy {

/// Response for [`DeployReq`].
#[derive(Debug, Serialize, Deserialize)]
pub struct DeployRes {
pub struct DeployServiceRes {
pub deployment_id: DeploymentId,
pub instances_mapping: HashMap<InstanceId, SocketAddr>,
}

/// Stops a given service from running in the system.
#[derive(Debug, Serialize, Deserialize)]
pub struct TerminateReq {
pub struct TerminateServiceReq {
pub service_id: ServiceId,
}

/// Response for [`TerminateReq`].
#[derive(Debug, Serialize, Deserialize)]
pub struct TerminateRes {}
pub struct TerminateServiceRes {}

#[derive(Debug, Serialize, Deserialize)]
pub struct ReportDeployInstanceStatusReq {
Expand Down
1 change: 1 addition & 0 deletions proto/src/ctl/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod deployer;
pub mod worker;
17 changes: 6 additions & 11 deletions proto/src/ctl/worker.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::common::node::{Metrics, NodeName};
use crate::common::node::Metrics;

/// Pushes new metrics of a given **worker** node.
/// Pushes new metrics of a given worker node.
///
/// The server must validate whether the node name corresponds to the
/// appropriate node address. If they don't match, the operation fails.
/// The controller associates the provided metrics to the node that sent them,
/// using its peer IP address.
///
/// The server *may* ignore older requests that are received out-of-order with
/// respect to the `recorded_at` field.
/// The controller server *may* ignore older requests that are received
/// out-of-order with respect to the `recorded_at` field.
#[derive(Debug, Serialize, Deserialize)]
pub struct PushWorkerMetricsReq {
pub node_name: NodeName,
pub metrics: Metrics,
/// The number of services that are being executed on the node.
pub services: HashMap<String, u32 /* todo: more info? */>,
pub recorded_at: DateTime<Utc>,
}

Expand Down
3 changes: 3 additions & 0 deletions proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ pub mod ctl;
pub mod etc;
pub mod well_known;
pub mod worker;

/// Clients for accessing worker and controller APIs.
pub mod clients;
4 changes: 4 additions & 0 deletions proto/src/well_known.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ pub const GRACEFUL_SHUTDOWN_DEADLINE: Duration = Duration::from_secs(20);

pub const PROXY_INSTANCE_HEADER_NAME: &str = "X-Tuc-Inst";

pub const CTL_HTTP_PORT: u16 = 6968;
pub const CTL_BALANCER_PORT: u16 = 8080;

pub const WORKER_PROXY_PORT: u16 = 8080;
pub const WORKER_HTTP_PORT: u16 = 6969;
Loading

0 comments on commit 06c69da

Please sign in to comment.