Skip to content

Commit

Permalink
Refactor parts of Plane into a plane-common crate (DIS-2967) (#849)
Browse files Browse the repository at this point in the history
The main `plane` crate contains a client, but depending on it in another
project requires bringing in _all_ of Plane, including the openssl
dependency. This breaks the client (and other pieces) out into a
`plane-common` crate with the parts needed to "consume" Plane as a
client, and a crate for Plane itself (in particular, without the
database and ACME stuff, which bring in other dependencies that can be
hard to wrangle).

This looks like a huge PR but there is essentially no net new code here;
files are just moved around. A few notes:
- I got rid of `plane-dynamic`; it was a hack to make tests compile
faster but it should be less necessary now that we have broken up Plane
into several crates.
- Roughly, the criteria for "what goes in common" is "the client and
everything it depends on". `plane` depends on `plane-common`, but
`plane-common` does NOT depend on `plane`, so things like
`ExponentialBackoff` (used by the client to manage reconnects) go in
`plane-common`.

Todo:
- [x] get `test_get_metrics` and `backend_lifecycle` tests to pass.
- [x] rename to `plane-common`?
- [x] p2 PR for refactor and ensure p2 integration tests pass
  • Loading branch information
paulgb authored Dec 3, 2024
1 parent 377ae64 commit 0a43883
Show file tree
Hide file tree
Showing 102 changed files with 893 additions and 586 deletions.
383 changes: 323 additions & 60 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[workspace]
resolver = "2"
members = [
"common",
"dynamic-proxy",
"plane/plane-tests",
"plane/plane-dynamic",
"plane",
"plane/plane-tests/plane-test-macro",
]
Expand Down
29 changes: 29 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "plane-common"
version = "0.5.1"
edition = "2021"

[dependencies]
axum = { version = "0.7.7", features = ["ws"] }
bollard = "0.17.0"
chrono = { version = "0.4.31", features = ["serde"] }
clap = "4.4.10"
data-encoding = "2.4.0"
futures-util = "0.3.29"
rand = "0.8.5"
reqwest = { version = "0.12.8", features = ["json"] }
serde = "1.0.109"
serde_json = "1.0.107"
serde_with = "3.4.0"
thiserror = "1.0.50"
tokio = { version = "1.33.0", features = ["sync"] }
tokio-tungstenite = "0.24.0"
tracing = "0.1.40"
tungstenite = "0.24.0"
url = "2.4.1"
valuable = { version = "0.1.0", features = ["derive"] }

[dev-dependencies]
anyhow = "1.0.93"
async-stream = "0.3.6"
axum = "0.7.9"
File renamed without changes.
File renamed without changes.
68 changes: 68 additions & 0 deletions common/src/exponential_backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::time::{Duration, SystemTime};

pub struct ExponentialBackoff {
initial_duration_millis: u128,
max_duration: Duration,
defer_duration: Duration,
multiplier: f64,
step: i32,
deferred_reset: Option<SystemTime>,
}

impl ExponentialBackoff {
pub fn new(
initial_duration: Duration,
max_duration: Duration,
multiplier: f64,
defer_duration: Duration,
) -> Self {
let initial_duration_millis = initial_duration.as_millis();

Self {
initial_duration_millis,
max_duration,
multiplier,
step: 0,
defer_duration,
deferred_reset: None,
}
}

/// Reset the backoff, but only if `wait` is not called again for at least `defer_duration`.
pub fn defer_reset(&mut self) {
self.deferred_reset = Some(SystemTime::now() + self.defer_duration);
}

pub async fn wait(&mut self) {
if let Some(deferred_reset) = self.deferred_reset {
self.deferred_reset = None;
if SystemTime::now() > deferred_reset {
self.reset();
return;
}
}

let duration = self.initial_duration_millis as f64 * self.multiplier.powi(self.step);
let duration = Duration::from_millis(duration as u64);
let duration = duration.min(self.max_duration);
tokio::time::sleep(duration).await;

self.step += 1;
}

pub fn reset(&mut self) {
self.deferred_reset = None;
self.step = 0;
}
}

impl Default for ExponentialBackoff {
fn default() -> Self {
Self::new(
Duration::from_secs(1),
Duration::from_secs(60),
1.1,
Duration::from_secs(60),
)
}
}
12 changes: 11 additions & 1 deletion plane/src/client/mod.rs → common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use self::controller_address::AuthorizedAddress;
use crate::{
controller::{error::ApiError, StatusResponse},
names::{BackendName, DroneName},
protocol::{MessageFromDns, MessageFromDrone, MessageFromProxy},
typed_socket::client::TypedSocketConnector,
Expand All @@ -9,11 +8,22 @@ use crate::{
ConnectResponse, DrainResult, DronePoolName, RevokeRequest,
},
};
use protocol::{ApiError, StatusResponse};
use reqwest::{Response, StatusCode};
use serde::de::DeserializeOwned;
use url::{form_urlencoded, Url};

pub mod controller_address;
pub mod exponential_backoff;
pub mod log_types;
pub mod names;
pub mod protocol;
pub mod serialization;
pub mod sse;
pub mod typed_socket;
pub mod types;
pub mod util;
pub mod version;

#[derive(thiserror::Error, Debug)]
pub enum PlaneClientError {
Expand Down
11 changes: 1 addition & 10 deletions plane/src/log_types.rs → common/src/log_types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{net::SocketAddr, time::SystemTime};
use time::OffsetDateTime;
use std::net::SocketAddr;
use valuable::{Tuplable, TupleDef, Valuable, Value, Visit};

// See: https://github.com/tokio-rs/valuable/issues/86#issuecomment-1760446976
Expand All @@ -27,14 +26,6 @@ impl Tuplable for LoggableTime {
}
}

impl From<OffsetDateTime> for LoggableTime {
fn from(offset: OffsetDateTime) -> Self {
let t: SystemTime = offset.into();
let dt: DateTime<Utc> = t.into();
Self(dt)
}
}

#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd)]
pub struct BackendAddr(pub SocketAddr);

Expand Down
24 changes: 13 additions & 11 deletions plane/src/names.rs → common/src/names.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{drone::runtime::docker::types::ContainerId, types::NodeKind};
use crate::types::NodeKind;
use clap::error::ErrorKind;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display};
Expand All @@ -17,8 +17,9 @@ pub enum NameError {
InvalidCharacter(char, usize),

#[error(
"too long ({0} characters; max is {} including prefix)",
MAX_NAME_LENGTH
"too long ({length} characters; max is {max} including prefix)",
length = "{0}",
max = MAX_NAME_LENGTH
)]
TooLong(usize),
}
Expand Down Expand Up @@ -163,17 +164,18 @@ entity_name!(DroneName, Some("dr"));
entity_name!(AcmeDnsServerName, Some("ns"));
entity_name!(BackendActionName, Some("ak"));

impl TryFrom<ContainerId> for BackendName {
type Error = NameError;

fn try_from(value: ContainerId) -> Result<Self, Self::Error> {
value
.as_str()
impl BackendName {
pub fn from_container_id(container_id: String) -> Result<Self, NameError> {
container_id
.strip_prefix("plane-")
.ok_or_else(|| NameError::InvalidPrefix(value.to_string(), "plane-".to_string()))?
.ok_or_else(|| NameError::InvalidPrefix(container_id.clone(), "plane-".to_string()))?
.to_string()
.try_into()
}

pub fn to_container_id(&self) -> String {
format!("plane-{}", self)
}
}

pub trait NodeName: Name {
Expand Down Expand Up @@ -294,7 +296,7 @@ mod tests {

#[test]
fn test_backend_name_from_invalid_container_id() {
let container_id = ContainerId::from("invalid-123".to_string());
let container_id = "invalid-123".to_string();
assert_eq!(
Err(NameError::InvalidPrefix(
"invalid-123".to_string(),
Expand Down
71 changes: 69 additions & 2 deletions plane/src/protocol.rs → common/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,45 @@
use std::fmt::Display;

use crate::{
database::backend::{BackendActionMessage, BackendMetricsMessage},
log_types::{BackendAddr, LoggableTime},
names::{BackendActionName, BackendName},
typed_socket::ChannelMessage,
types::{
backend_state::TerminationReason, BackendState, BearerToken, ClusterName, KeyConfig,
SecretToken, Subdomain, TerminationKind,
NodeId, SecretToken, Subdomain, TerminationKind,
},
};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum ApiErrorKind {
FailedToAcquireKey,
KeyUnheldNoSpawnConfig,
KeyHeldUnhealthy,
KeyHeld,
NoDroneAvailable,
FailedToRemoveKey,
DatabaseError,
NoClusterProvided,
NotFound,
InvalidClusterName,
Other,
}

#[derive(thiserror::Error, Debug, Serialize, Deserialize)]
pub struct ApiError {
pub id: String,
pub kind: ApiErrorKind,
pub message: String,
}

impl Display for ApiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

#[derive(Serialize, Deserialize, Debug, Clone, valuable::Valuable, PartialEq)]
pub struct KeyDeadlines {
/// When the key should be renewed.
Expand Down Expand Up @@ -132,6 +161,29 @@ pub enum MessageFromDrone {
RenewKey(RenewKeyRequest),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BackendMetricsMessage {
pub backend_id: BackendName,
/// Memory used by backend excluding inactive file cache, same as use shown by docker stats
/// ref: https://github.com/docker/cli/blob/master/cli/command/container/stats_helpers.go#L227C45-L227C45
pub mem_used: u64,
/// Memory used by backend in bytes
/// (calculated using kernel memory used by cgroup + page cache memory used by cgroup)
pub mem_total: u64,
/// Active memory (non reclaimable)
pub mem_active: u64,
/// Inactive memory (reclaimable)
pub mem_inactive: u64,
/// Unevictable memory (mlock etc)
pub mem_unevictable: u64,
/// The backend's memory limit
pub mem_limit: u64,
/// Nanoseconds of CPU used by backend since last message
pub cpu_used: u64,
/// Total CPU nanoseconds for system since last message
pub sys_cpu: u64,
}

impl ChannelMessage for MessageFromDrone {
type Reply = MessageToDrone;
}
Expand All @@ -146,6 +198,14 @@ pub struct RenewKeyResponse {
pub deadlines: Option<KeyDeadlines>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackendActionMessage {
pub action_id: BackendActionName,
pub backend_id: BackendName,
pub drone_id: NodeId,
pub action: BackendAction,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum MessageToDrone {
Action(BackendActionMessage),
Expand Down Expand Up @@ -263,3 +323,10 @@ pub enum MessageToDns {
impl ChannelMessage for MessageToDns {
type Reply = MessageFromDns;
}

#[derive(Serialize, Deserialize, Debug)]
pub struct StatusResponse {
pub status: String,
pub version: String,
pub hash: String,
}
File renamed without changes.
2 changes: 1 addition & 1 deletion plane/src/client/sse.rs → common/src/sse.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::PlaneClientError;
use crate::util::ExponentialBackoff;
use crate::exponential_backoff::ExponentialBackoff;
use reqwest::{
header::{HeaderValue, ACCEPT, CONNECTION},
Client, Response,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use super::{ChannelMessage, Handshake, SocketAction, TypedSocket};
use crate::client::controller_address::AuthorizedAddress;
use crate::client::PlaneClientError;
use crate::controller_address::AuthorizedAddress;
use crate::exponential_backoff::ExponentialBackoff;
use crate::names::NodeName;
use crate::{plane_version_info, util::ExponentialBackoff};
use crate::version::plane_version_info;
use crate::PlaneClientError;
use futures_util::{SinkExt, StreamExt};
use std::marker::PhantomData;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -191,7 +192,7 @@ async fn new_client<T: ChannelMessage>(

#[cfg(test)]
mod test {
use crate::client::controller_address::AuthorizedAddress;
use crate::controller_address::AuthorizedAddress;

#[test]
fn test_url_no_token() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::client::PlaneClientError;
use crate::PlaneVersionInfo;
use crate::version::PlaneVersionInfo;
use crate::PlaneClientError;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
Expand Down
Loading

0 comments on commit 0a43883

Please sign in to comment.