Skip to content

Commit

Permalink
refac: parameterize ports (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
lffg committed Jun 20, 2024
1 parent cf02dc8 commit d9d2fec
Show file tree
Hide file tree
Showing 18 changed files with 165 additions and 129 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::IpAddr;
use std::net::SocketAddr;

use clap::{Parser, Subcommand};

Expand All @@ -20,15 +20,15 @@ pub enum Cmd {
pub enum NodeCmd {
List,
Show {
address: IpAddr,
address: SocketAddr,
},
#[clap(subcommand)]
Worker(WorkerCmd),
}

#[derive(Debug, Subcommand)]
pub enum WorkerCmd {
Remove { address: IpAddr },
Remove { address: SocketAddr },
}

#[derive(Debug, Subcommand)]
Expand Down
1 change: 1 addition & 0 deletions ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ utils.workspace = true

axum.workspace = true
chrono.workspace = true
clap.workspace = true
eyre.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
16 changes: 16 additions & 0 deletions ctl/src/args.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use clap::Parser;

#[derive(Debug, Parser)]
pub struct CtlArgs {
/// This controller node's HTTP server port.
///
/// If not provided, a random port will be chosen.
#[arg(long)]
pub http_port: Option<u16>,

/// This controller node's Balancer server port.
///
/// If not provided, a random port will be chosen.
#[arg(long)]
pub balancer_port: Option<u16>,
}
14 changes: 7 additions & 7 deletions ctl/src/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(dead_code)]

use std::{collections::HashMap, net::IpAddr};
use std::{collections::HashMap, net::SocketAddr};

use chrono::{DateTime, Utc};
use proto::{
Expand All @@ -13,7 +13,7 @@ use tracing::instrument;
pub struct Discovery {
rx: mpsc::Receiver<Msg>,
// TODO: Add more information on workers
workers: HashMap<IpAddr, Metrics>,
workers: HashMap<SocketAddr, Metrics>,
services: HashMap<ServiceId, ServiceInfo>,
instances: HashMap<InstanceId, InstanceInfo>,
deployments: HashMap<DeploymentId, DeploymentInfo>,
Expand Down Expand Up @@ -74,12 +74,12 @@ impl DiscoveryHandle {
}

#[allow(dead_code)] // TODO: Remove
pub async fn add_worker(&self, addr: IpAddr, metrics: Metrics) {
pub async fn add_worker(&self, addr: SocketAddr, metrics: Metrics) {
self.send(Msg::WorkerAdd(addr, metrics)).await;
}

#[allow(dead_code)] // TODO: Remove
pub async fn drop_worker(&self, addr: IpAddr) {
pub async fn drop_worker(&self, addr: SocketAddr) {
self.send(Msg::WorkerDrop(addr)).await;
}

Expand All @@ -94,8 +94,8 @@ impl DiscoveryHandle {
#[derive(Debug)]
#[allow(clippy::enum_variant_names)] // remove this once more variants are added
enum Msg {
WorkerAdd(IpAddr, Metrics),
WorkerDrop(IpAddr),
WorkerAdd(SocketAddr, Metrics),
WorkerDrop(SocketAddr),
WorkerQuery(oneshot::Sender<Vec<WorkerDetails>>),
// TODO: add service and instance operations
}
Expand Down Expand Up @@ -135,6 +135,6 @@ pub struct DeploymentInfo {

#[derive(Debug)]
pub struct WorkerDetails {
pub addr: IpAddr,
pub addr: SocketAddr,
pub metrics: Metrics,
}
10 changes: 3 additions & 7 deletions ctl/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use axum::{routing::post, Router};
use proto::well_known::CTL_HTTP_PORT;
use utils::server;

use crate::discovery::DiscoveryHandle;

Expand All @@ -12,8 +10,8 @@ pub struct HttpState {
pub discovery: DiscoveryHandle,
}

pub async fn run_server(state: HttpState) {
let app = Router::new()
pub fn mk_app(state: HttpState) -> Router {
Router::new()
.route("/worker/push-metrics", post(worker::push_metrics))
.nest(
"/deployer",
Expand All @@ -22,7 +20,5 @@ pub async fn run_server(state: HttpState) {
.route("/terminate-service", post(deployer::terminate_service))
.route("/status", post(deployer::report_instance_status)),
)
.with_state(state);

server::listen("controller http", app, ("0.0.0.0", CTL_HTTP_PORT)).await;
.with_state(state)
}
6 changes: 3 additions & 3 deletions ctl/src/http/worker.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use axum::Json;
use proto::{common::node::Metrics, ctl::worker::PushWorkerMetricsRes};
use proto::ctl::worker::{PushWorkerMetricsReq, PushWorkerMetricsRes};
use tracing::info;

pub async fn push_metrics(Json(payload): Json<Metrics>) -> Json<PushWorkerMetricsRes> {
pub async fn push_metrics(Json(payload): Json<PushWorkerMetricsReq>) -> Json<PushWorkerMetricsRes> {
info!("{payload:#?}");
todo!()
Json(PushWorkerMetricsRes {})
}
42 changes: 31 additions & 11 deletions ctl/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,50 @@
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
};

use clap::Parser;
use tokio::task::JoinSet;
use tracing::info;
use utils::server::mk_listener;

use crate::{discovery::Discovery, http::HttpState};
use crate::{args::CtlArgs, discovery::Discovery, http::HttpState};

mod args;
mod discovery;
mod http;

const ANY_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));

#[tokio::main]
async fn main() {
async fn main() -> eyre::Result<()> {
utils::setup::tracing();
info!("started controller");

let (discovery, discovery_handle) = Discovery::new();
let args = Arc::new(CtlArgs::parse());
info!(?args, "started ctl");

let (_balancer_listener, _balancer_port) = mk_listener(ANY_IP, args.balancer_port).await?;
let (http_listener, http_port) = mk_listener(ANY_IP, args.http_port).await?;

let discovery_actor_handle = tokio::spawn(async move {
let mut bag = JoinSet::new();

let (discovery, discovery_handle) = Discovery::new();
bag.spawn(async move {
discovery.run().await;
});

let http_handle = tokio::spawn({
bag.spawn(async move {
let state = HttpState {
discovery: discovery_handle.clone(),
};
async move {
http::run_server(state).await;
}
let app = http::mk_app(state);
info!("ctl http listening at {ANY_IP}:{http_port}");
axum::serve(http_listener, app).await.unwrap();
});

discovery_actor_handle.await.unwrap();
http_handle.await.unwrap();
while let Some(res) = bag.join_next().await {
res?;
}

Ok(())
}
9 changes: 3 additions & 6 deletions proto/src/clients/ctl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::IpAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc};

use chrono::{DateTime, Utc};

Expand All @@ -16,7 +16,6 @@ use crate::{
},
worker::{PushWorkerMetricsReq, PushWorkerMetricsRes},
},
well_known::CTL_HTTP_PORT,
};

#[derive(Clone)]
Expand All @@ -27,10 +26,8 @@ pub struct CtlClient {

impl CtlClient {
#[must_use]
pub fn new(ctl_addr: IpAddr) -> Self {
let base_url = format!("http://{ctl_addr}:{CTL_HTTP_PORT}")
.into_boxed_str()
.into();
pub fn new(ctl_addr: SocketAddr) -> Self {
let base_url = format!("http://{ctl_addr}").into_boxed_str().into();
let client = BaseClient::new();
CtlClient { base_url, client }
}
Expand Down
2 changes: 2 additions & 0 deletions proto/src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ impl BaseClient {
.send()
.await
.wrap_err("failed to send request to worker")?
.error_for_status()
.wrap_err("http request failed")?
.json::<Res>()
.await
.wrap_err("failed to parse response from worker")?;
Expand Down
11 changes: 5 additions & 6 deletions proto/src/clients/worker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::net::IpAddr;
use std::net::SocketAddr;

use crate::{
clients::BaseClient,
common::instance::{InstanceId, InstanceSpec},
well_known::WORKER_HTTP_PORT,
worker::runner::{
DeployInstanceReq, DeployInstanceRes, TerminateInstanceReq, TerminateInstanceRes,
},
Expand All @@ -23,14 +22,14 @@ impl WorkerClient {
}

#[allow(clippy::unused_self)]
fn url(&self, worker: IpAddr, path: &str) -> String {
fn url(&self, worker: SocketAddr, path: &str) -> String {
assert!(path.starts_with('/'));
format!("http://{worker}:{WORKER_HTTP_PORT}{path}")
format!("http://{worker}{path}")
}

pub async fn deploy_instance(
&self,
worker: IpAddr,
worker: SocketAddr,
instance_spec: InstanceSpec,
) -> eyre::Result<DeployInstanceRes> {
let body = DeployInstanceReq { instance_spec };
Expand All @@ -41,7 +40,7 @@ impl WorkerClient {

pub async fn terminate_instance(
&self,
worker: IpAddr,
worker: SocketAddr,
instance_id: InstanceId,
) -> eyre::Result<TerminateInstanceRes> {
let body = TerminateInstanceReq { instance_id };
Expand Down
4 changes: 2 additions & 2 deletions proto/src/common/node.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::net::IpAddr;
use std::net::SocketAddr;

use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Node {
pub addr: IpAddr,
pub addr: SocketAddr,
pub kind: NodeKind,
}

Expand Down
4 changes: 2 additions & 2 deletions proto/src/ctl/deployer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, net::IpAddr};
use std::{collections::HashMap, net::SocketAddr};

use serde::{Deserialize, Serialize};
use uuid::Uuid;
Expand Down Expand Up @@ -34,7 +34,7 @@ pub enum RedeploymentPolicy {
#[derive(Debug, Serialize, Deserialize)]
pub struct DeployServiceRes {
pub deployment_id: DeploymentId,
pub instances: HashMap<InstanceId, IpAddr>,
pub instances: HashMap<InstanceId, SocketAddr>,
}

/// Stops a given service from running in the system.
Expand Down
6 changes: 0 additions & 6 deletions proto/src/well_known.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,3 @@ use std::time::Duration;
pub const GRACEFUL_SHUTDOWN_DEADLINE: Duration = Duration::from_secs(20);

pub const PROXY_INSTANCE_HEADER_NAME: &str = "X-Tuc-Inst";

// TODO: These should be parameterized through args
pub const CTL_HTTP_PORT: u16 = 7070;
pub const CTL_BALANCER_PORT: u16 = 8080;
pub const WORKER_HTTP_PORT: u16 = 7071;
pub const WORKER_PROXY_PORT: u16 = 8081;
45 changes: 24 additions & 21 deletions utils/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
use std::{convert::Infallible, future::IntoFuture, io};
use std::net::IpAddr;

use axum::{
extract::Request,
response::Response,
serve::{IncomingStream, Serve},
};
use tokio::net::{TcpListener, ToSocketAddrs};
use tower::Service;
use tracing::info;
use eyre::Context;
use tokio::net::TcpListener;

pub async fn listen<A, M, S>(name: &'static str, mk_svc: M, addr: A)
where
A: ToSocketAddrs,
M: for<'a> Service<IncomingStream<'a>, Error = Infallible, Response = S>,
S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
S::Future: Send,
Serve<M, S>: IntoFuture<Output = io::Result<()>>,
{
let listener = TcpListener::bind(addr).await.unwrap();
let addr = listener.local_addr().unwrap();
info!("{name} listening at {addr}");
axum::serve(listener, mk_svc).await.unwrap();
/// Creates a new TCP listener.
///
/// Tries to use the provided port, if any. If the provided port is already in
/// use, this method will return an error.
///
/// If no port is provided, a random one will be chosen by the OS.
pub async fn mk_listener(
addr: impl Into<IpAddr>,
port: Option<u16>,
) -> eyre::Result<(TcpListener, u16)> {
let addr = addr.into();
let port = port.unwrap_or(0);

let listener = TcpListener::bind((addr, port))
.await
.wrap_err("failed to start tcp listener")?;

let local_addr = listener.local_addr().expect("local addr must exist");
let port = local_addr.port();

Ok((listener, port))
}
Loading

0 comments on commit d9d2fec

Please sign in to comment.