Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refac: centralize http clients in the proto crate #51

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bollard = "0.16.1"
clap = { version = "4.5", features = ["derive"] }
chrono = { version = "0.4.38", default-features = false, features = [
"std",
"now",
"serde",
] }
eyre = "0.6"
Expand Down Expand Up @@ -46,3 +47,4 @@ module_name_repetitions = "allow"
cast_precision_loss = "allow"
unused_async = "allow"
enum_glob_use = "allow"
missing_errors_doc = "allow"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ cargo run -p ctl
Worker:

```
cargo run -p worker -- --controller-addr '127.0.0.1:3000'
cargo run -p worker -- --controller-addr '127.0.0.1'
```
29 changes: 29 additions & 0 deletions ctl/src/http/deployer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use axum::{extract::State, Json};
use proto::ctl::deployer::{
DeployServiceReq, DeployServiceRes, ReportDeployInstanceStatusReq,
ReportDeployInstanceStatusRes, TerminateServiceReq, TerminateServiceRes,
};

use crate::http::HttpState;

pub async fn report_instance_status(
State(state): State<HttpState>,
Json(_payload): Json<ReportDeployInstanceStatusReq>,
) -> Json<ReportDeployInstanceStatusRes> {
_ = state.discovery;
todo!();
}

pub async fn deploy_service(
State(_state): State<HttpState>,
Json(_payload): Json<DeployServiceReq>,
) -> Json<DeployServiceRes> {
todo!();
}

pub async fn terminate_service(
State(_state): State<HttpState>,
Json(_payload): Json<TerminateServiceReq>,
) -> Json<TerminateServiceRes> {
todo!();
}
12 changes: 0 additions & 12 deletions ctl/src/http/deployer/mod.rs

This file was deleted.

10 changes: 8 additions & 2 deletions ctl/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ pub struct HttpState {

pub async fn run_server(state: HttpState) {
let app = Router::new()
.route("/worker/metrics", post(worker::push_metrics))
.route("/deploy", post(deployer::deploy))
.route("/worker/push-metrics", post(worker::push_metrics))
.nest(
"/deployer",
Router::new()
.route("/deploy-service", post(deployer::deploy_service))
.route("/terminate-service", post(deployer::terminate_service))
.route("/status", post(deployer::report_instance_status)),
)
.with_state(state);

let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
Expand Down
8 changes: 8 additions & 0 deletions ctl/src/http/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use axum::Json;
use proto::{common::node::Metrics, ctl::worker::PushWorkerMetricsRes};
use tracing::info;

pub async fn push_metrics(Json(payload): Json<Metrics>) -> Json<PushWorkerMetricsRes> {
info!("{payload:#?}");
todo!()
}
7 changes: 0 additions & 7 deletions ctl/src/http/worker/mod.rs

This file was deleted.

4 changes: 3 additions & 1 deletion proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ workspace = true

[dependencies]
chrono.workspace = true
serde.workspace = true
eyre.workspace = true
reqwest.workspace = true
serde_json.workspace = true
serde.workspace = true
uuid.workspace = true
92 changes: 92 additions & 0 deletions proto/src/clients/ctl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::{net::IpAddr, sync::Arc};

use chrono::{DateTime, Utc};

use crate::{
clients::BaseClient,
common::{
instance::{self, InstanceId},
node::Metrics,
service::{ServiceId, ServiceSpec},
},
ctl::{
deployer::{
DeployServiceReq, DeployServiceRes, RedeploymentPolicy, ReportDeployInstanceStatusReq,
ReportDeployInstanceStatusRes, TerminateServiceReq, TerminateServiceRes,
},
worker::{PushWorkerMetricsReq, PushWorkerMetricsRes},
},
well_known::CTL_HTTP_PORT,
};

#[derive(Clone)]
pub struct CtlClient {
base_url: Arc<str>,
client: BaseClient,
}

impl CtlClient {
#[must_use]
pub fn new(ctl_addr: IpAddr) -> Self {
let base_url = format!("http://{ctl_addr}:{CTL_HTTP_PORT}")
.into_boxed_str()
.into();
let client = BaseClient::new();
CtlClient { base_url, client }
}

fn url(&self, path: &str) -> String {
assert!(path.starts_with('/'));
format!("{base}{path}", base = self.base_url)
}

pub async fn push_metrics(
&self,
metrics: Metrics,
recorded_at: DateTime<Utc>,
) -> eyre::Result<PushWorkerMetricsRes> {
let body = PushWorkerMetricsReq {
metrics,
recorded_at,
};
self.client
.send(self.url("/worker/push-metrics"), &body)
.await
}

pub async fn deploy_service(
&self,
service_spec: ServiceSpec,
redeployment_policy: RedeploymentPolicy,
) -> eyre::Result<DeployServiceRes> {
let body = DeployServiceReq {
service_spec,
redeployment_policy,
};
self.client
.send(self.url("/deployer/deploy-service"), &body)
.await
}

pub async fn terminate_service(
&self,
service_id: ServiceId,
) -> eyre::Result<TerminateServiceRes> {
let body = TerminateServiceReq { service_id };
self.client
.send(self.url("/deployer/terminate-service"), &body)
.await
}

pub async fn report_instance_status(
&self,
instance_id: InstanceId,
status: instance::Status,
) -> eyre::Result<ReportDeployInstanceStatusRes> {
let body = ReportDeployInstanceStatusReq {
instance_id,
status,
};
self.client.send(self.url("/deployer/status"), &body).await
}
}
41 changes: 41 additions & 0 deletions proto/src/clients/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
mod ctl;
pub use ctl::CtlClient;

mod worker;
use eyre::Context as _;
use serde::{de::DeserializeOwned, Serialize};
pub use worker::WorkerClient;

#[derive(Clone)]
pub struct BaseClient {
client: reqwest::Client,
}

impl BaseClient {
#[must_use]
fn new() -> Self {
let client = reqwest::Client::new();
Self { client }
}

/// Sends a request to the given path, on the given worker.
///
/// Paths must start with a `/`.
async fn send<Req, Res>(&self, url: impl AsRef<str>, body: &Req) -> eyre::Result<Res>
where
Req: Serialize,
Res: DeserializeOwned,
{
let res = self
.client
.post(url.as_ref())
.json(body)
.send()
.await
.wrap_err("failed to send request to worker")?
.json::<Res>()
.await
.wrap_err("failed to parse response from worker")?;
Ok(res)
}
}
52 changes: 52 additions & 0 deletions proto/src/clients/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::net::IpAddr;

use crate::{
clients::BaseClient,
common::instance::{InstanceId, InstanceSpec},
well_known::WORKER_HTTP_PORT,
worker::runner::{
DeployInstanceReq, DeployInstanceRes, TerminateInstanceReq, TerminateInstanceRes,
},
};

#[derive(Clone)]
pub struct WorkerClient {
client: BaseClient,
}

impl WorkerClient {
#[must_use]
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let client = BaseClient::new();
WorkerClient { client }
}

#[allow(clippy::unused_self)]
fn url(&self, worker: IpAddr, path: &str) -> String {
assert!(path.starts_with('/'));
format!("http://{worker}:{WORKER_HTTP_PORT}{path}")
}

pub async fn deploy_instance(
&self,
worker: IpAddr,
instance_spec: InstanceSpec,
) -> eyre::Result<DeployInstanceRes> {
let body = DeployInstanceReq { instance_spec };
self.client
.send(self.url(worker, "/runner/deploy-instance"), &body)
.await
}

pub async fn terminate_instance(
&self,
worker: IpAddr,
instance_id: InstanceId,
) -> eyre::Result<TerminateInstanceRes> {
let body = TerminateInstanceReq { instance_id };
self.client
.send(self.url(worker, "/runner/terminate-instance"), &body)
.await
}
}
8 changes: 4 additions & 4 deletions proto/src/ctl/deployer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct DeploymentId(pub Uuid);

/// Starts a new deploy in the system.
#[derive(Debug, Serialize, Deserialize)]
pub struct DeployReq {
pub struct DeployServiceReq {
pub service_spec: ServiceSpec,
pub redeployment_policy: RedeploymentPolicy,
}
Expand All @@ -32,20 +32,20 @@ pub enum RedeploymentPolicy {

/// Response for [`DeployReq`].
#[derive(Debug, Serialize, Deserialize)]
pub struct DeployRes {
pub struct DeployServiceRes {
pub deployment_id: DeploymentId,
pub instances_mapping: HashMap<InstanceId, SocketAddr>,
}

/// Stops a given service from running in the system.
#[derive(Debug, Serialize, Deserialize)]
pub struct TerminateReq {
pub struct TerminateServiceReq {
pub service_id: ServiceId,
}

/// Response for [`TerminateReq`].
#[derive(Debug, Serialize, Deserialize)]
pub struct TerminateRes {}
pub struct TerminateServiceRes {}

#[derive(Debug, Serialize, Deserialize)]
pub struct ReportDeployInstanceStatusReq {
Expand Down
1 change: 1 addition & 0 deletions proto/src/ctl/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod deployer;
pub mod worker;
17 changes: 6 additions & 11 deletions proto/src/ctl/worker.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::common::node::{Metrics, NodeName};
use crate::common::node::Metrics;

/// Pushes new metrics of a given **worker** node.
/// Pushes new metrics of a given worker node.
///
/// The server must validate whether the node name corresponds to the
/// appropriate node address. If they don't match, the operation fails.
/// The controller associates the provided metrics to the node that sent them,
/// using its peer IP address.
///
/// The server *may* ignore older requests that are received out-of-order with
/// respect to the `recorded_at` field.
/// The controller server *may* ignore older requests that are received
/// out-of-order with respect to the `recorded_at` field.
#[derive(Debug, Serialize, Deserialize)]
pub struct PushWorkerMetricsReq {
pub node_name: NodeName,
pub metrics: Metrics,
/// The number of services that are being executed on the node.
pub services: HashMap<String, u32 /* todo: more info? */>,
pub recorded_at: DateTime<Utc>,
}

Expand Down
3 changes: 3 additions & 0 deletions proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ pub mod ctl;
pub mod etc;
pub mod well_known;
pub mod worker;

/// Clients for accessing worker and controller APIs.
pub mod clients;
4 changes: 4 additions & 0 deletions proto/src/well_known.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ pub const GRACEFUL_SHUTDOWN_DEADLINE: Duration = Duration::from_secs(20);

pub const PROXY_INSTANCE_HEADER_NAME: &str = "X-Tuc-Inst";

pub const CTL_HTTP_PORT: u16 = 6968;
pub const CTL_BALANCER_PORT: u16 = 8080;

pub const WORKER_PROXY_PORT: u16 = 8080;
pub const WORKER_HTTP_PORT: u16 = 6969;
Loading