From 4df2b873dc37affbd869a115677f2c23b4a09a42 Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Sat, 20 Jan 2024 12:34:41 -0500 Subject: [PATCH 1/6] Initial structured logging support --- .cargo/config | 4 ++ Cargo.lock | 42 +++++++++++++++++++++ dev/controller.sh | 7 +++- dev/drone.sh | 8 +++- dev/proxy.sh | 7 +++- plane/Cargo.toml | 3 +- plane/src/controller/drone.rs | 11 ++++-- plane/src/database/connect.rs | 11 ++++-- plane/src/database/subscribe.rs | 2 +- plane/src/drone/backend_manager.rs | 15 +++++--- plane/src/drone/docker/commands.rs | 2 +- plane/src/drone/key_manager.rs | 15 ++++---- plane/src/drone/mod.rs | 14 +++++-- plane/src/drone/state_store.rs | 4 +- plane/src/init_tracing.rs | 24 +++++++++--- plane/src/lib.rs | 1 + plane/src/log_types.rs | 47 +++++++++++++++++++++++ plane/src/names.rs | 11 +++++- plane/src/protocol.rs | 15 ++++---- plane/src/typed_socket/mod.rs | 12 +++--- plane/src/types.rs | 60 +++++++++++++++++++++++------- 21 files changed, 254 insertions(+), 61 deletions(-) create mode 100644 .cargo/config create mode 100644 plane/src/log_types.rs diff --git a/.cargo/config b/.cargo/config new file mode 100644 index 000000000..6a5eea4e7 --- /dev/null +++ b/.cargo/config @@ -0,0 +1,4 @@ +# See: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/index.html#unstable-features + +[build] +rustflags = ["--cfg", "tracing_unstable"] diff --git a/Cargo.lock b/Cargo.lock index 276905000..fbd63bbb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1664,6 +1664,7 @@ dependencies = [ "trust-dns-server", "tungstenite", "url", + "valuable", "x509-parser", ] @@ -2826,6 +2827,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", + "valuable", + "valuable-serde", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -2836,12 +2849,17 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", + "valuable", + "valuable-serde", ] [[package]] @@ -3002,6 +3020,30 @@ name = "valuable" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +dependencies = [ + "valuable-derive", +] + +[[package]] +name = "valuable-derive" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d44690c645190cfce32f91a1582281654b2338c6073fa250b0949fd25c55b32" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "valuable-serde" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5285cfff30cdabe26626736a54d989687dd9cab84f51f4048b61d6d0ae8b0907" +dependencies = [ + "serde", + "valuable", +] [[package]] name = "vcpkg" diff --git a/dev/controller.sh b/dev/controller.sh index fc67d2814..d11ade326 100755 --- a/dev/controller.sh +++ b/dev/controller.sh @@ -1,3 +1,8 @@ #!/bin/sh -cargo run -- controller --db postgres://postgres@localhost "$@" --default-cluster localhost:9090 +PLANE_LOG_JSON=true \ + cargo run -- \ + controller \ + --db postgres://postgres@localhost \ + --default-cluster localhost:9090 \ + "$@" diff --git a/dev/drone.sh b/dev/drone.sh index 3e6058883..fe3b5f95d 100755 --- a/dev/drone.sh +++ b/dev/drone.sh @@ -1,3 +1,9 @@ #!/bin/sh -cargo run -- drone --controller-url ws://localhost:8080/ --cluster "localhost:9090" --db "drone.sqlite" "$@" +PLANE_LOG_JSON=true \ + cargo run -- \ + drone \ + --controller-url ws://localhost:8080/ \ + --cluster "localhost:9090" \ + --db "drone.sqlite" \ + "$@" diff --git a/dev/proxy.sh b/dev/proxy.sh index 5e91a2d38..4511d4b55 100755 --- a/dev/proxy.sh +++ b/dev/proxy.sh @@ -1,3 +1,8 @@ #!/bin/sh -cargo run -- proxy --controller-url ws://localhost:8080/ --cluster "localhost:9090" "$@" +PLANE_LOG_JSON=true \ + cargo run -- \ + proxy \ + --controller-url ws://localhost:8080/ \ + --cluster "localhost:9090" \ + "$@" diff --git a/plane/Cargo.toml b/plane/Cargo.toml index ccf5097b0..f0b6f8977 100644 --- a/plane/Cargo.toml +++ b/plane/Cargo.toml @@ -44,8 +44,9 @@ tokio-stream = "0.1.14" tokio-tungstenite = { version = "0.20.1", features = ["rustls-tls-webpki-roots"] } tower-http = { version = "0.4.4", features = ["trace", "cors"] } tracing = "0.1.40" -tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } +tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json", "valuable"] } trust-dns-server = "0.23.2" tungstenite = "0.20.1" url = "2.4.1" x509-parser = "0.15.1" +valuable = { version = "0.1.0", features = ["derive"] } diff --git a/plane/src/controller/drone.rs b/plane/src/controller/drone.rs index ab564cd6c..f2239a092 100644 --- a/plane/src/controller/drone.rs +++ b/plane/src/controller/drone.rs @@ -9,6 +9,7 @@ use crate::{ subscribe::Subscription, PlaneDatabase, }, + log_types::LoggableTime, protocol::{ BackendAction, Heartbeat, KeyDeadlines, MessageFromDrone, MessageToDrone, RenewKeyResponse, }, @@ -77,9 +78,13 @@ pub async fn handle_message_from_drone( .await?; let deadlines = KeyDeadlines { - renew_at: renew_key_request.local_time + KEY_LEASE_RENEW_AFTER, - soft_terminate_at: renew_key_request.local_time + KEY_LEASE_SOFT_TERMINATE_AFTER, - hard_terminate_at: renew_key_request.local_time + KEY_LEASE_HARD_TERMINATE_AFTER, + renew_at: LoggableTime(renew_key_request.local_time + KEY_LEASE_RENEW_AFTER), + soft_terminate_at: LoggableTime( + renew_key_request.local_time + KEY_LEASE_SOFT_TERMINATE_AFTER, + ), + hard_terminate_at: LoggableTime( + renew_key_request.local_time + KEY_LEASE_HARD_TERMINATE_AFTER, + ), }; let renew_key_response = RenewKeyResponse { diff --git a/plane/src/database/connect.rs b/plane/src/database/connect.rs index ff8bfd5ef..f73c6d71b 100644 --- a/plane/src/database/connect.rs +++ b/plane/src/database/connect.rs @@ -9,6 +9,7 @@ use crate::{ backend_key::{KeysDatabase, KEY_LEASE_EXPIRATION}, drone::DroneDatabase, }, + log_types::LoggableTime, names::{BackendName, Name}, protocol::{AcquiredKey, BackendAction, KeyDeadlines}, types::{ @@ -129,9 +130,13 @@ async fn create_backend_with_key( let acquired_key = AcquiredKey { key: key.clone(), deadlines: KeyDeadlines { - renew_at: drone_for_spawn.last_local_time + KEY_LEASE_RENEW_AFTER, - soft_terminate_at: drone_for_spawn.last_local_time + KEY_LEASE_SOFT_TERMINATE_AFTER, - hard_terminate_at: drone_for_spawn.last_local_time + KEY_LEASE_SOFT_TERMINATE_AFTER, + renew_at: LoggableTime(drone_for_spawn.last_local_time + KEY_LEASE_RENEW_AFTER), + soft_terminate_at: LoggableTime( + drone_for_spawn.last_local_time + KEY_LEASE_SOFT_TERMINATE_AFTER, + ), + hard_terminate_at: LoggableTime( + drone_for_spawn.last_local_time + KEY_LEASE_SOFT_TERMINATE_AFTER, + ), }, token: result.fencing_token, }; diff --git a/plane/src/database/subscribe.rs b/plane/src/database/subscribe.rs index ae79e21bb..c622e5702 100644 --- a/plane/src/database/subscribe.rs +++ b/plane/src/database/subscribe.rs @@ -269,7 +269,7 @@ impl EventSubscriptionManager { let mut listeners = self.listeners.write().expect("Listener map is poisoned."); let key = (kind.clone(), key.map(|s| s.to_string())); - tracing::info!(?key, "Subscribing to event"); + tracing::info!(kind = key.0, key = key.1, "Subscribing to event"); match listeners.entry(key.clone()) { std::collections::hash_map::Entry::Occupied(entry) => { diff --git a/plane/src/drone/backend_manager.rs b/plane/src/drone/backend_manager.rs index cdad4e2c1..c934f7082 100644 --- a/plane/src/drone/backend_manager.rs +++ b/plane/src/drone/backend_manager.rs @@ -18,6 +18,7 @@ use std::{ net::IpAddr, sync::{Arc, Mutex, RwLock}, }; +use valuable::Valuable; /// The backend manager uses a state machine internally to manage the state of the backend. /// Each time we enter a state, we can do one of three things: @@ -233,7 +234,7 @@ impl BackendManager { }) } BackendStatus::Waiting => StepStatusResult::future_status(async move { - let address = match state.address { + let address = match state.address() { Some(address) => address, None => { tracing::error!("State is waiting, but no associated address."); @@ -278,19 +279,23 @@ impl BackendManager { } } - pub fn set_state(self: &Arc, status: BackendState) { - tracing::info!(?self.backend_id, ?status, "Updating backend state"); + pub fn set_state(self: &Arc, state: BackendState) { + tracing::info!( + backend_id = self.backend_id.as_value(), + state = state.as_value(), + "Updating backend state" + ); let mut handle = self.handle.lock().expect("Guard handle lock is poisoned"); // Cancel any existing task. handle.take(); // Call the callback. - if let Err(err) = (self.state_callback)(&status) { + if let Err(err) = (self.state_callback)(&state) { tracing::error!(?err, "Error calling state callback."); return; } - let result = self.step_state(status); + let result = self.step_state(state); match result { StepStatusResult::DoNothing => {} StepStatusResult::SetState(status) => { diff --git a/plane/src/drone/docker/commands.rs b/plane/src/drone/docker/commands.rs index 5f618aef4..8b39fae27 100644 --- a/plane/src/drone/docker/commands.rs +++ b/plane/src/drone/docker/commands.rs @@ -149,7 +149,7 @@ fn get_container_config_from_executor_config( .resource_limits .cpu_time_limit .map(|cpu_time_limit| { - let Ok(secs) = cpu_time_limit.as_secs().try_into() else { + let Ok(secs) = cpu_time_limit.0.as_secs().try_into() else { tracing::warn!( "unable to convert cpu_time_limit: {:?} to i64 for use in ulimit", cpu_time_limit diff --git a/plane/src/drone/key_manager.rs b/plane/src/drone/key_manager.rs index 36c9146e2..e5aaa477a 100644 --- a/plane/src/drone/key_manager.rs +++ b/plane/src/drone/key_manager.rs @@ -12,6 +12,7 @@ use std::{ time::{Duration, SystemTime}, }; use tokio::time::sleep; +use valuable::Valuable; pub struct KeyManager { executor: Arc, @@ -33,7 +34,7 @@ async fn renew_key_loop( loop { let now = SystemTime::now(); let deadlines = &key.deadlines; - if now >= deadlines.hard_terminate_at { + if now >= deadlines.hard_terminate_at.0 { tracing::warn!("Key {:?} has expired, hard-terminating.", key.key); if let Err(err) = executor .apply_action( @@ -51,7 +52,7 @@ async fn renew_key_loop( break; } - if now >= deadlines.soft_terminate_at { + if now >= deadlines.soft_terminate_at.0 { tracing::warn!("Key {:?} has expired, soft-terminating.", key.key); if let Err(err) = executor .apply_action( @@ -67,15 +68,15 @@ async fn renew_key_loop( continue; } - if let Ok(time_to_sleep) = deadlines.hard_terminate_at.duration_since(now) { + if let Ok(time_to_sleep) = deadlines.hard_terminate_at.0.duration_since(now) { sleep(time_to_sleep).await; } continue; } - if now >= deadlines.renew_at { - tracing::info!("Renewing key {:?}.", key.key); + if now >= deadlines.renew_at.0 { + tracing::info!(key = key.key.as_value(), "Renewing key."); if let Some(ref sender) = sender { let request = RenewKeyRequest { @@ -88,13 +89,13 @@ async fn renew_key_loop( } } - if let Ok(time_to_sleep) = deadlines.soft_terminate_at.duration_since(now) { + if let Ok(time_to_sleep) = deadlines.soft_terminate_at.0.duration_since(now) { sleep(time_to_sleep).await; } continue; } - if let Ok(time_to_sleep) = deadlines.renew_at.duration_since(now) { + if let Ok(time_to_sleep) = deadlines.renew_at.0.duration_since(now) { sleep(time_to_sleep).await; } } diff --git a/plane/src/drone/mod.rs b/plane/src/drone/mod.rs index 2fe6a1d8c..0fc50f0b5 100644 --- a/plane/src/drone/mod.rs +++ b/plane/src/drone/mod.rs @@ -23,6 +23,7 @@ use std::{ sync::{Arc, Mutex}, }; use tokio::task::JoinHandle; +use valuable::Valuable; mod backend_manager; pub mod docker; @@ -87,7 +88,11 @@ pub async fn drone_loop( action, .. }) => { - tracing::info!(?backend_id, ?action, "Received action."); + tracing::info!( + backend_id = backend_id.as_value(), + action = action.as_value(), + "Received action." + ); if let BackendAction::Spawn { key, .. } = &action { // Register the key with the key manager, ensuring that it will be refreshed. @@ -108,14 +113,17 @@ pub async fn drone_loop( } } MessageToDrone::AckEvent { event_id } => { - tracing::info!(?event_id, "Received status ack."); + tracing::info!(event_id = event_id.as_value(), "Received status ack."); if let Err(err) = executor.ack_event(event_id) { tracing::error!(?err, "Error acking event."); } } MessageToDrone::RenewKeyResponse(renew_key_response) => { let RenewKeyResponse { backend, deadlines } = renew_key_response; - tracing::info!(?deadlines, "Received key renewal response."); + tracing::info!( + deadlines = deadlines.as_value(), + "Received key renewal response." + ); if let Some(deadlines) = deadlines { key_manager diff --git a/plane/src/drone/state_store.rs b/plane/src/drone/state_store.rs index 1f981c582..f73db37f9 100644 --- a/plane/src/drone/state_store.rs +++ b/plane/src/drone/state_store.rs @@ -106,7 +106,7 @@ impl StateStore { event_id, backend_id: backend_id.clone(), status: state.status, - address: state.address, + address: state.address(), exit_code: state.exit_code, timestamp, }; @@ -170,7 +170,7 @@ impl StateStore { event_id: BackendEventId::from(event_id), backend_id: BackendName::try_from(backend_id)?, status: state.status, - address: state.address, + address: state.address(), exit_code: state.exit_code, timestamp: DateTime::UNIX_EPOCH + chrono::Duration::milliseconds(timestamp), }; diff --git a/plane/src/init_tracing.rs b/plane/src/init_tracing.rs index 80dcdac75..ccdb2b1b3 100644 --- a/plane/src/init_tracing.rs +++ b/plane/src/init_tracing.rs @@ -8,10 +8,24 @@ pub fn init_tracing() { .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy(); - let layer = tracing_subscriber::fmt::layer().with_span_events(FmtSpan::FULL); + // Use JSON if PLANE_LOG_JSON is set to anything other than "false". + let use_json = std::env::var("PLANE_LOG_JSON") + .map(|s| s != "false") + .unwrap_or_default(); - tracing_subscriber::registry() - .with(layer) - .with(filter) - .init(); + if use_json { + let layer = tracing_subscriber::fmt::layer().json(); + tracing_subscriber::registry() + .with(layer) + .with(filter) + .init(); + return; + } else { + let layer = tracing_subscriber::fmt::layer().with_span_events(FmtSpan::FULL); + + tracing_subscriber::registry() + .with(layer) + .with(filter) + .init(); + } } diff --git a/plane/src/lib.rs b/plane/src/lib.rs index c900fd5ac..01374df79 100644 --- a/plane/src/lib.rs +++ b/plane/src/lib.rs @@ -12,6 +12,7 @@ pub mod dns; pub mod drone; pub mod heartbeat_consts; pub mod init_tracing; +pub mod log_types; pub mod names; pub mod protocol; pub mod proxy; diff --git a/plane/src/log_types.rs b/plane/src/log_types.rs new file mode 100644 index 000000000..ffa75433a --- /dev/null +++ b/plane/src/log_types.rs @@ -0,0 +1,47 @@ +use serde::{Deserialize, Serialize}; +use std::{net::SocketAddr, time::SystemTime}; +use valuable::{Tuplable, TupleDef, Valuable, Value, Visit}; + +// See: https://github.com/tokio-rs/valuable/issues/86#issuecomment-1760446976 + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct LoggableTime(pub SystemTime); + +impl Valuable for LoggableTime { + fn as_value(&self) -> Value<'_> { + Value::Tuplable(self) + } + + fn visit(&self, visit: &mut dyn Visit) { + let s: String = format!("{:?}", self.0); + let val = Value::String(s.as_str()); + visit.visit_unnamed_fields(&[val]); + } +} + +impl Tuplable for LoggableTime { + fn definition(&self) -> TupleDef { + TupleDef::new_static(1) + } +} + +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] +pub struct BackendAddr(pub SocketAddr); + +impl valuable::Valuable for BackendAddr { + fn as_value(&self) -> valuable::Value { + Value::Tuplable(self) + } + + fn visit(&self, visit: &mut dyn valuable::Visit) { + let s: String = format!("{:?}", self.0); + let val = valuable::Value::String(s.as_str()); + visit.visit_unnamed_fields(&[val]); + } +} + +impl Tuplable for BackendAddr { + fn definition(&self) -> valuable::TupleDef { + valuable::TupleDef::new_static(1) + } +} diff --git a/plane/src/names.rs b/plane/src/names.rs index 6ca200270..0954ca3f8 100644 --- a/plane/src/names.rs +++ b/plane/src/names.rs @@ -34,7 +34,16 @@ pub trait Name: macro_rules! entity_name { ($name:ident, $prefix:literal) => { - #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] + #[derive( + Debug, + Clone, + PartialEq, + Eq, + Hash, + serde::Serialize, + serde::Deserialize, + valuable::Valuable, + )] pub struct $name(String); impl Name for $name { diff --git a/plane/src/protocol.rs b/plane/src/protocol.rs index ef8f72d8a..c7bb05914 100644 --- a/plane/src/protocol.rs +++ b/plane/src/protocol.rs @@ -1,5 +1,6 @@ use crate::{ database::backend::BackendActionMessage, + log_types::LoggableTime, names::{BackendActionName, BackendName}, typed_socket::ChannelMessage, types::{ @@ -11,19 +12,19 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::{net::SocketAddr, time::SystemTime}; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, valuable::Valuable)] pub struct KeyDeadlines { /// When the key should be renewed. - pub renew_at: SystemTime, + pub renew_at: LoggableTime, /// When the backend should be soft-terminated if the key could not be renewed. - pub soft_terminate_at: SystemTime, + pub soft_terminate_at: LoggableTime, /// When the backend should be hard-terminated if the key could not be renewed. - pub hard_terminate_at: SystemTime, + pub hard_terminate_at: LoggableTime, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, valuable::Valuable)] pub struct AcquiredKey { /// Details of the key itself. pub key: KeyConfig, @@ -39,7 +40,7 @@ pub struct AcquiredKey { pub token: i64, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, valuable::Valuable)] pub enum BackendAction { Spawn { executable: Box, @@ -65,7 +66,7 @@ pub struct BackendStateMessage { pub timestamp: DateTime, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, valuable::Valuable)] pub struct BackendEventId(i64); impl From for BackendEventId { diff --git a/plane/src/typed_socket/mod.rs b/plane/src/typed_socket/mod.rs index fc2d1fb1a..4c20425fa 100644 --- a/plane/src/typed_socket/mod.rs +++ b/plane/src/typed_socket/mod.rs @@ -113,15 +113,15 @@ impl Handshake { pub fn check_compat(&self, other: &Handshake) { if self.version.version != other.version.version { tracing::warn!( - "Client and server have different Plane versions: {} (local) != {} (remote).", - self.version.version, - other.version.version + local_version = self.version.version, + remote_version = other.version.version, + "Client and server have different Plane versions." ); } else if self.version.git_hash != other.version.git_hash { tracing::warn!( - "Client and server have different Plane git hashes: {} (local) != {} (remote).", - self.version.git_hash, - other.version.git_hash + local_version = self.version.git_hash, + remote_version = other.version.git_hash, + "Client and server have different Plane git hashes.", ); } } diff --git a/plane/src/types.rs b/plane/src/types.rs index ead43bb11..e651a7083 100644 --- a/plane/src/types.rs +++ b/plane/src/types.rs @@ -1,4 +1,6 @@ -use crate::{client::PlaneClient, names::BackendName, util::random_prefixed_string}; +use crate::{ + client::PlaneClient, log_types::BackendAddr, names::BackendName, util::random_prefixed_string, +}; use bollard::auth::DockerCredentials; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -59,7 +61,7 @@ impl FromStr for ClusterName { } } -#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, PartialOrd)] +#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, PartialOrd, valuable::Valuable)] pub enum BackendStatus { /// The backend has been scheduled to a drone, but has not yet been acknowledged. /// This status is only assigned by the controller; the drone will never assign it by definition. @@ -86,18 +88,18 @@ pub enum BackendStatus { Terminated, } -#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq)] +#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, valuable::Valuable)] pub enum TerminationKind { Soft, Hard, } -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, valuable::Valuable)] pub struct BackendState { pub status: BackendStatus, #[serde(skip_serializing_if = "Option::is_none")] - pub address: Option, + pub address: Option, #[serde(skip_serializing_if = "Option::is_none")] pub termination: Option, @@ -107,6 +109,10 @@ pub struct BackendState { } impl BackendState { + pub fn address(&self) -> Option { + self.address.as_ref().map(|BackendAddr(addr)| *addr) + } + pub fn to_loading(&self) -> BackendState { BackendState { status: BackendStatus::Loading, @@ -124,7 +130,7 @@ impl BackendState { pub fn to_waiting(&self, address: SocketAddr) -> BackendState { BackendState { status: BackendStatus::Waiting, - address: Some(address), + address: Some(BackendAddr(address)), ..self.clone() } } @@ -183,7 +189,7 @@ impl Display for BackendStatus { } } -#[derive(Clone, Copy, Serialize, Deserialize, Debug, Default)] +#[derive(Clone, Copy, Serialize, Deserialize, Debug, Default, valuable::Valuable)] pub enum PullPolicy { #[default] IfNotPresent, @@ -198,6 +204,16 @@ pub struct DockerCpuPeriod( #[serde_as(as = "serde_with::DurationMicroSeconds")] std::time::Duration, ); +impl valuable::Valuable for DockerCpuPeriod { + fn as_value(&self) -> valuable::Value { + valuable::Value::U128(self.0.as_micros()) + } + + fn visit(&self, visit: &mut dyn valuable::Visit) { + visit.visit_value(self.as_value()) + } +} + impl Default for DockerCpuPeriod { fn default() -> Self { Self(std::time::Duration::from_millis(100)) @@ -211,7 +227,24 @@ impl From<&DockerCpuPeriod> for std::time::Duration { } #[serde_with::serde_as] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(transparent)] +pub struct DockerCpuTimeLimit( + #[serde_as(as = "serde_with::DurationSeconds")] pub std::time::Duration, +); + +impl valuable::Valuable for DockerCpuTimeLimit { + fn as_value(&self) -> valuable::Value { + valuable::Value::U64(self.0.as_secs()) + } + + fn visit(&self, visit: &mut dyn valuable::Visit) { + visit.visit_value(self.as_value()) + } +} + +#[serde_with::serde_as] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default, valuable::Valuable)] pub struct ResourceLimits { /// Period of cpu time (de/serializes as microseconds) pub cpu_period: Option, @@ -220,8 +253,7 @@ pub struct ResourceLimits { pub cpu_period_percent: Option, /// Total cpu time allocated to container - #[serde_as(as = "Option>")] - pub cpu_time_limit: Option, + pub cpu_time_limit: Option, /// Maximum amount of memory container can use (in bytes) pub memory_limit_bytes: Option, @@ -242,7 +274,7 @@ impl ResourceLimits { } } -#[derive(Clone, Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug, valuable::Valuable)] #[serde(untagged)] pub enum DockerRegistryAuth { UsernamePassword { username: String, password: String }, @@ -260,7 +292,7 @@ impl From for DockerCredentials { } } -#[derive(Clone, Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug, valuable::Valuable)] pub struct ExecutorConfig { pub image: String, pub pull_policy: Option, @@ -300,7 +332,9 @@ pub struct SpawnConfig { pub max_idle_seconds: Option, } -#[derive(Clone, Serialize, Deserialize, Debug, Default, PartialEq, Eq, Hash)] +#[derive( + Clone, Serialize, Deserialize, Debug, Default, PartialEq, Eq, Hash, valuable::Valuable, +)] pub struct KeyConfig { /// If provided, and a running backend was created with the same key, /// cluster, namespace, and tag, we will connect to that backend instead From e9f2bbda58fd16c91de08e732667866b1f4caeed Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Sat, 20 Jan 2024 13:23:34 -0500 Subject: [PATCH 2/6] More structured logging --- plane/plane-tests/tests/backend_actions.rs | 3 +- plane/src/admin.rs | 4 +-- plane/src/controller/backend_status.rs | 3 +- plane/src/controller/dns.rs | 6 +++- plane/src/controller/drone.rs | 14 ++++---- plane/src/controller/proxy.rs | 6 +++- plane/src/database/backend.rs | 12 ++++--- plane/src/database/drone.rs | 4 +-- plane/src/drone/docker/mod.rs | 4 +-- plane/src/drone/heartbeat.rs | 5 +-- plane/src/drone/key_manager.rs | 28 ++++++++++------ plane/src/drone/state_store.rs | 11 ++++--- plane/src/init_tracing.rs | 1 - plane/src/log_types.rs | 16 ++++++++-- plane/src/protocol.rs | 19 ++++++----- plane/src/proxy/cert_manager.rs | 37 ++++++++++++++-------- plane/src/proxy/cert_pair.rs | 7 ++-- plane/src/proxy/proxy_connection.rs | 6 +++- plane/src/types.rs | 11 ++++--- 19 files changed, 121 insertions(+), 76 deletions(-) diff --git a/plane/plane-tests/tests/backend_actions.rs b/plane/plane-tests/tests/backend_actions.rs index e8e5d52f7..ce1156e8c 100644 --- a/plane/plane-tests/tests/backend_actions.rs +++ b/plane/plane-tests/tests/backend_actions.rs @@ -4,6 +4,7 @@ use hyper::StatusCode; use plane::{ client::PlaneClientError, database::backend::BackendActionMessage, + log_types::LoggableTime, names::{DroneName, Name}, protocol::{BackendAction, Heartbeat, MessageFromDrone, MessageToDrone}, types::{ClusterName, ConnectRequest, ConnectResponse, ExecutorConfig, SpawnConfig}, @@ -59,7 +60,7 @@ async fn backend_action_resent_if_not_acked(env: TestEnvironment) { tracing::info!("Sending initial heartbeat message (mocking the drone)."); drone_connection .send(MessageFromDrone::Heartbeat(Heartbeat { - local_time: Utc::now(), + local_time: LoggableTime(Utc::now()), })) .await .unwrap(); diff --git a/plane/src/admin.rs b/plane/src/admin.rs index 24d0b6514..0c08a6f2c 100644 --- a/plane/src/admin.rs +++ b/plane/src/admin.rs @@ -174,7 +174,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE println!( "Status: {} at {}", status.status.to_string().magenta(), - status.time.to_string().bright_cyan() + status.time.0.to_string().bright_cyan() ); if status.status >= BackendStatus::Ready { @@ -206,7 +206,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE println!( "Status: {} at {}", status.status.to_string().magenta(), - status.time.to_string().bright_cyan() + status.time.0.to_string().bright_cyan() ); if status.status >= BackendStatus::Terminated { diff --git a/plane/src/controller/backend_status.rs b/plane/src/controller/backend_status.rs index eb4c81c56..ce9529b08 100644 --- a/plane/src/controller/backend_status.rs +++ b/plane/src/controller/backend_status.rs @@ -1,5 +1,6 @@ use super::{core::Controller, error::IntoApiError}; use crate::{ + log_types::LoggableTime, names::BackendName, types::{BackendStatus, TimestampedBackendStatus}, }; @@ -29,7 +30,7 @@ async fn backend_status( let result = TimestampedBackendStatus { status: backend.last_status, - time: backend.last_status_time, + time: LoggableTime(backend.last_status_time), }; Ok(result) diff --git a/plane/src/controller/dns.rs b/plane/src/controller/dns.rs index c064015b6..592338315 100644 --- a/plane/src/controller/dns.rs +++ b/plane/src/controller/dns.rs @@ -8,6 +8,7 @@ use axum::{ response::IntoResponse, }; use std::net::{IpAddr, SocketAddr}; +use valuable::Valuable; pub async fn dns_socket_inner( ws: WebSocket, @@ -21,7 +22,10 @@ pub async fn dns_socket_inner( loop { let message_from_dns_result = socket.recv().await; - tracing::info!(?message_from_dns_result, "Handling message from DNS..."); + tracing::info!( + v = message_from_dns_result.as_value(), + "Handling message from DNS..." + ); match message_from_dns_result { Some(MessageFromDns::TxtRecordRequest { cluster }) => { let txt_value = match controller.db.acme().txt_record_for_cluster(&cluster).await { diff --git a/plane/src/controller/drone.rs b/plane/src/controller/drone.rs index f2239a092..b27cfda4a 100644 --- a/plane/src/controller/drone.rs +++ b/plane/src/controller/drone.rs @@ -22,6 +22,7 @@ use axum::{ response::{IntoResponse, Response}, }; use std::net::{IpAddr, SocketAddr}; +use valuable::Valuable; pub async fn handle_message_from_drone( msg: MessageFromDrone, @@ -37,14 +38,11 @@ pub async fn handle_message_from_drone( controller .db .drone() - .heartbeat(drone_id, local_time) + .heartbeat(drone_id, local_time.0) .await?; } MessageFromDrone::BackendEvent(backend_event) => { - tracing::info!( - event = ?backend_event, - "Received backend event" - ); + tracing::info!(event = backend_event.as_value(), "Received backend event"); controller .db @@ -78,12 +76,12 @@ pub async fn handle_message_from_drone( .await?; let deadlines = KeyDeadlines { - renew_at: LoggableTime(renew_key_request.local_time + KEY_LEASE_RENEW_AFTER), + renew_at: LoggableTime(renew_key_request.local_time.0 + KEY_LEASE_RENEW_AFTER), soft_terminate_at: LoggableTime( - renew_key_request.local_time + KEY_LEASE_SOFT_TERMINATE_AFTER, + renew_key_request.local_time.0 + KEY_LEASE_SOFT_TERMINATE_AFTER, ), hard_terminate_at: LoggableTime( - renew_key_request.local_time + KEY_LEASE_HARD_TERMINATE_AFTER, + renew_key_request.local_time.0 + KEY_LEASE_HARD_TERMINATE_AFTER, ), }; diff --git a/plane/src/controller/proxy.rs b/plane/src/controller/proxy.rs index 688e3b87d..8a5a3ddf2 100644 --- a/plane/src/controller/proxy.rs +++ b/plane/src/controller/proxy.rs @@ -14,6 +14,7 @@ use axum::{ response::{IntoResponse, Response}, }; use std::net::{IpAddr, SocketAddr}; +use valuable::Valuable; pub async fn proxy_socket_inner( cluster: ClusterName, @@ -97,7 +98,10 @@ pub async fn proxy_socket_inner( } }; - tracing::info!(?response, "Sending cert manager response"); + tracing::info!( + response = response.as_value(), + "Sending cert manager response" + ); if let Err(err) = socket .send(MessageToProxy::CertManagerResponse(response)) diff --git a/plane/src/database/backend.rs b/plane/src/database/backend.rs index 9169d8bcd..3c49b7232 100644 --- a/plane/src/database/backend.rs +++ b/plane/src/database/backend.rs @@ -1,5 +1,6 @@ use super::{subscribe::emit_with_key, util::MapSqlxError, PlaneDatabase}; use crate::{ + log_types::{BackendAddr, LoggableTime}, names::{BackendActionName, BackendName}, protocol::{BackendAction, RouteInfo}, types::{BackendStatus, BearerToken, NodeId, SecretToken, TimestampedBackendStatus}, @@ -68,7 +69,7 @@ impl<'a> BackendDatabase<'a> { match status { Ok(status) => { yield TimestampedBackendStatus { - time: row.created_at, + time: LoggableTime(row.created_at), status, }; last_status = Some(status); @@ -95,7 +96,7 @@ impl<'a> BackendDatabase<'a> { let item = TimestampedBackendStatus { status, - time, + time: LoggableTime(time), }; yield item; @@ -148,7 +149,7 @@ impl<'a> BackendDatabase<'a> { &self, backend: &BackendName, status: BackendStatus, - address: Option, + address: Option, exit_code: Option, ) -> sqlx::Result<()> { let mut txn = self.db.pool.begin().await?; @@ -167,7 +168,7 @@ impl<'a> BackendDatabase<'a> { "#, backend.to_string(), status.to_string(), - address.map(|a| a.to_string()), + address.map(|a| a.0.to_string()), exit_code, ) .execute(&mut *txn) @@ -351,9 +352,10 @@ impl<'a> BackendDatabase<'a> { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] pub struct TerminationCandidate { pub backend_id: BackendName, + pub expiration_time: Option>, pub last_keepalive: DateTime, pub allowed_idle_seconds: Option, diff --git a/plane/src/database/drone.rs b/plane/src/database/drone.rs index e0447a573..98c46dd36 100644 --- a/plane/src/database/drone.rs +++ b/plane/src/database/drone.rs @@ -4,7 +4,7 @@ use crate::{ }; use chrono::{DateTime, Utc}; use sqlx::{postgres::types::PgInterval, query, query_as, PgPool}; -use std::time::{Duration, SystemTime}; +use std::time::Duration; pub struct DroneDatabase<'a> { pool: &'a PgPool, @@ -114,5 +114,5 @@ impl<'a> DroneDatabase<'a> { pub struct DroneForSpawn { pub id: NodeId, - pub last_local_time: SystemTime, + pub last_local_time: DateTime, } diff --git a/plane/src/drone/docker/mod.rs b/plane/src/drone/docker/mod.rs index 5e83f263e..1c5e6bacf 100644 --- a/plane/src/drone/docker/mod.rs +++ b/plane/src/drone/docker/mod.rs @@ -72,7 +72,7 @@ impl PlaneDocker { Ok(e) => e, }; - tracing::info!("Received event: {:?}", e); + tracing::info!(event=?e, "Received event"); let Some(actor) = e.actor else { tracing::warn!("Received event without actor."); @@ -101,7 +101,7 @@ impl PlaneDocker { } }; - tracing::info!("Received exit code: {:?}", exit_code); + tracing::info!(exit_code, "Received exit code"); Some(TerminateEvent { backend_id, diff --git a/plane/src/drone/heartbeat.rs b/plane/src/drone/heartbeat.rs index f190aa433..eb7d10199 100644 --- a/plane/src/drone/heartbeat.rs +++ b/plane/src/drone/heartbeat.rs @@ -1,5 +1,6 @@ use crate::{ - heartbeat_consts::HEARTBEAT_INTERVAL, protocol::Heartbeat, typed_socket::TypedSocketSender, + heartbeat_consts::HEARTBEAT_INTERVAL, log_types::LoggableTime, protocol::Heartbeat, + typed_socket::TypedSocketSender, }; use chrono::Utc; use tokio::task::JoinHandle; @@ -13,7 +14,7 @@ impl HeartbeatLoop { pub fn start(sender: TypedSocketSender) -> Self { let handle = tokio::spawn(async move { loop { - let local_time = Utc::now(); + let local_time = LoggableTime(Utc::now()); if let Err(err) = sender.send(Heartbeat { local_time }) { tracing::error!(?err, "failed to send heartbeat"); } diff --git a/plane/src/drone/key_manager.rs b/plane/src/drone/key_manager.rs index e5aaa477a..b9abe5bd5 100644 --- a/plane/src/drone/key_manager.rs +++ b/plane/src/drone/key_manager.rs @@ -1,16 +1,14 @@ use super::executor::Executor; use crate::{ + log_types::LoggableTime, names::BackendName, protocol::{AcquiredKey, BackendAction, KeyDeadlines, RenewKeyRequest}, typed_socket::TypedSocketSender, types::TerminationKind, util::GuardHandle, }; -use std::{ - collections::HashMap, - sync::Arc, - time::{Duration, SystemTime}, -}; +use chrono::Utc; +use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::time::sleep; use valuable::Valuable; @@ -32,7 +30,7 @@ async fn renew_key_loop( executor: Arc, ) { loop { - let now = SystemTime::now(); + let now = Utc::now(); let deadlines = &key.deadlines; if now >= deadlines.hard_terminate_at.0 { tracing::warn!("Key {:?} has expired, hard-terminating.", key.key); @@ -68,7 +66,12 @@ async fn renew_key_loop( continue; } - if let Ok(time_to_sleep) = deadlines.hard_terminate_at.0.duration_since(now) { + if let Ok(time_to_sleep) = deadlines + .hard_terminate_at + .0 + .signed_duration_since(now) + .to_std() + { sleep(time_to_sleep).await; } @@ -81,7 +84,7 @@ async fn renew_key_loop( if let Some(ref sender) = sender { let request = RenewKeyRequest { backend: backend.clone(), - local_time: SystemTime::now(), + local_time: LoggableTime(Utc::now()), }; if let Err(err) = sender.send(request) { @@ -89,13 +92,18 @@ async fn renew_key_loop( } } - if let Ok(time_to_sleep) = deadlines.soft_terminate_at.0.duration_since(now) { + if let Ok(time_to_sleep) = deadlines + .soft_terminate_at + .0 + .signed_duration_since(now) + .to_std() + { sleep(time_to_sleep).await; } continue; } - if let Ok(time_to_sleep) = deadlines.renew_at.0.duration_since(now) { + if let Ok(time_to_sleep) = deadlines.renew_at.0.signed_duration_since(now).to_std() { sleep(time_to_sleep).await; } } diff --git a/plane/src/drone/state_store.rs b/plane/src/drone/state_store.rs index f73db37f9..9ba18574c 100644 --- a/plane/src/drone/state_store.rs +++ b/plane/src/drone/state_store.rs @@ -1,4 +1,5 @@ use crate::{ + log_types::LoggableTime, names::BackendName, protocol::{BackendEventId, BackendMetricsMessage, BackendStateMessage}, typed_socket::TypedSocketSender, @@ -106,9 +107,9 @@ impl StateStore { event_id, backend_id: backend_id.clone(), status: state.status, - address: state.address(), + address: state.address, exit_code: state.exit_code, - timestamp, + timestamp: LoggableTime(timestamp), }; listener(event_message); @@ -170,9 +171,11 @@ impl StateStore { event_id: BackendEventId::from(event_id), backend_id: BackendName::try_from(backend_id)?, status: state.status, - address: state.address(), + address: state.address, exit_code: state.exit_code, - timestamp: DateTime::UNIX_EPOCH + chrono::Duration::milliseconds(timestamp), + timestamp: LoggableTime( + DateTime::UNIX_EPOCH + chrono::Duration::milliseconds(timestamp), + ), }; result.push(event); diff --git a/plane/src/init_tracing.rs b/plane/src/init_tracing.rs index ccdb2b1b3..727eee53c 100644 --- a/plane/src/init_tracing.rs +++ b/plane/src/init_tracing.rs @@ -19,7 +19,6 @@ pub fn init_tracing() { .with(layer) .with(filter) .init(); - return; } else { let layer = tracing_subscriber::fmt::layer().with_span_events(FmtSpan::FULL); diff --git a/plane/src/log_types.rs b/plane/src/log_types.rs index ffa75433a..ec6cfe5ea 100644 --- a/plane/src/log_types.rs +++ b/plane/src/log_types.rs @@ -1,11 +1,13 @@ +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::{net::SocketAddr, time::SystemTime}; +use time::OffsetDateTime; use valuable::{Tuplable, TupleDef, Valuable, Value, Visit}; // See: https://github.com/tokio-rs/valuable/issues/86#issuecomment-1760446976 #[derive(Serialize, Deserialize, Clone, Debug)] -pub struct LoggableTime(pub SystemTime); +pub struct LoggableTime(#[serde(with = "chrono::serde::ts_milliseconds")] pub DateTime); impl Valuable for LoggableTime { fn as_value(&self) -> Value<'_> { @@ -13,7 +15,7 @@ impl Valuable for LoggableTime { } fn visit(&self, visit: &mut dyn Visit) { - let s: String = format!("{:?}", self.0); + let s: String = format!("{}", self.0); let val = Value::String(s.as_str()); visit.visit_unnamed_fields(&[val]); } @@ -25,7 +27,15 @@ impl Tuplable for LoggableTime { } } -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] +impl From for LoggableTime { + fn from(offset: OffsetDateTime) -> Self { + let t: SystemTime = offset.into(); + let dt: DateTime = t.into(); + Self(dt) + } +} + +#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq)] pub struct BackendAddr(pub SocketAddr); impl valuable::Valuable for BackendAddr { diff --git a/plane/src/protocol.rs b/plane/src/protocol.rs index c7bb05914..0d2ed8b60 100644 --- a/plane/src/protocol.rs +++ b/plane/src/protocol.rs @@ -1,6 +1,6 @@ use crate::{ database::backend::BackendActionMessage, - log_types::LoggableTime, + log_types::{BackendAddr, LoggableTime}, names::{BackendActionName, BackendName}, typed_socket::ChannelMessage, types::{ @@ -8,9 +8,8 @@ use crate::{ TerminationKind, }, }; -use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use std::{net::SocketAddr, time::SystemTime}; +use std::net::SocketAddr; #[derive(Serialize, Deserialize, Debug, Clone, valuable::Valuable)] pub struct KeyDeadlines { @@ -51,19 +50,19 @@ pub enum BackendAction { }, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, valuable::Valuable)] pub struct BackendStateMessage { pub event_id: BackendEventId, pub backend_id: BackendName, pub status: BackendStatus, #[serde(skip_serializing_if = "Option::is_none")] - pub address: Option, + pub address: Option, #[serde(skip_serializing_if = "Option::is_none")] pub exit_code: Option, - pub timestamp: DateTime, + pub timestamp: LoggableTime, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, valuable::Valuable)] @@ -85,12 +84,12 @@ impl From for i64 { pub struct RenewKeyRequest { pub backend: BackendName, - pub local_time: SystemTime, + pub local_time: LoggableTime, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Heartbeat { - pub local_time: DateTime, + pub local_time: LoggableTime, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -178,7 +177,7 @@ impl ChannelMessage for CertManagerRequest { type Reply = CertManagerResponse; } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, valuable::Valuable)] pub enum CertManagerResponse { /// Acknowledge a lease request and indicate whether it was accepted. CertLeaseResponse { accepted: bool }, @@ -231,7 +230,7 @@ impl ChannelMessage for MessageToProxy { type Reply = MessageFromProxy; } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, valuable::Valuable)] pub enum MessageFromDns { TxtRecordRequest { cluster: ClusterName }, } diff --git a/plane/src/proxy/cert_manager.rs b/plane/src/proxy/cert_manager.rs index 51af8e5c5..fe0916d7f 100644 --- a/plane/src/proxy/cert_manager.rs +++ b/plane/src/proxy/cert_manager.rs @@ -1,5 +1,6 @@ use super::{cert_pair::CertificatePair, AcmeConfig}; use crate::{ + log_types::LoggableTime, protocol::{CertManagerRequest, CertManagerResponse}, types::ClusterName, }; @@ -8,10 +9,12 @@ use acme2_eab::{ DirectoryBuilder, OrderBuilder, OrderStatus, }; use anyhow::{anyhow, Context, Result}; +use chrono::Utc; use std::{ + ops::Sub, path::{Path, PathBuf}, sync::{Arc, Mutex}, - time::{Duration, SystemTime}, + time::Duration, }; use tokio::sync::{ broadcast, @@ -21,6 +24,7 @@ use tokio_rustls::rustls::{ server::{ClientHello, ResolvesServerCert}, sign::CertifiedKey, }; +use valuable::Valuable; const DNS_01: &str = "dns-01"; @@ -209,21 +213,23 @@ async fn refresh_loop_step( let last_current_cert = send_cert.borrow().clone(); if let Some(current_cert) = last_current_cert { - let renewal_time = current_cert - .validity_end - .checked_sub(RENEWAL_WINDOW) - .expect("Cert validity date arithmetic failed."); - let time_until_renew = renewal_time.duration_since(SystemTime::now())?; + let renewal_time = LoggableTime(current_cert.validity_end.0.sub(RENEWAL_WINDOW)); + let time_until_renew = renewal_time.0.sub(Utc::now()); - if time_until_renew > Duration::ZERO { + if time_until_renew > chrono::Duration::zero() { tracing::info!( - "Certificate for {} is valid until {:?}. Renewal scheduled for {:?} ({:?} from now). Sleeping.", - current_cert.common_name, - current_cert.validity_end, - renewal_time, - time_until_renew + common_name = current_cert.common_name, + validity_end = current_cert.validity_end.as_value(), + renewal_time = renewal_time.as_value(), + days_until_renew = time_until_renew.num_days(), + "Obtained certificate.", ); - tokio::time::sleep(time_until_renew).await; + tokio::time::sleep( + time_until_renew + .to_std() + .expect("time_until_renew is always positive."), + ) + .await; return Ok(()); } } @@ -355,7 +361,10 @@ async fn get_certificate( return Err(anyhow!("Cert manager error.")); } }; - tracing::info!(?response, "Received response from cert manager."); + tracing::info!( + response = response.as_value(), + "Received response from cert manager." + ); match response { CertManagerResponse::SetTxtRecordResponse { accepted: true } => (), diff --git a/plane/src/proxy/cert_pair.rs b/plane/src/proxy/cert_pair.rs index 7cc27e4f1..0066a8d40 100644 --- a/plane/src/proxy/cert_pair.rs +++ b/plane/src/proxy/cert_pair.rs @@ -1,8 +1,9 @@ +use crate::log_types::LoggableTime; use anyhow::{anyhow, Result}; use pem::Pem; use rustls_pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs1KeyDer}; use serde::{Deserialize, Serialize}; -use std::{fs::Permissions, io, os::unix::fs::PermissionsExt, path::Path, time::SystemTime}; +use std::{fs::Permissions, io, os::unix::fs::PermissionsExt, path::Path}; use tokio_rustls::rustls::{ sign::{any_supported_type, CertifiedKey}, Certificate, PrivateKey, @@ -42,8 +43,8 @@ pub struct CertificatePair { pub certified_key: CertifiedKey, pub private_key_der: Vec, pub common_name: String, - pub validity_start: SystemTime, - pub validity_end: SystemTime, + pub validity_start: LoggableTime, + pub validity_end: LoggableTime, } impl CertificatePair { diff --git a/plane/src/proxy/proxy_connection.rs b/plane/src/proxy/proxy_connection.rs index dcc1f9b72..7dadb0ab4 100644 --- a/plane/src/proxy/proxy_connection.rs +++ b/plane/src/proxy/proxy_connection.rs @@ -7,6 +7,7 @@ use crate::{ }; use std::sync::Arc; use tokio::task::JoinHandle; +use valuable::Valuable; pub struct ProxyConnection { handle: JoinHandle<()>, @@ -58,7 +59,10 @@ impl ProxyConnection { state.route_map.receive(response); } MessageToProxy::CertManagerResponse(response) => { - tracing::info!("Received cert manager response: {:?}", response); + tracing::info!( + response = response.as_value(), + "Received cert manager response" + ); cert_manager.receive(response); } } diff --git a/plane/src/types.rs b/plane/src/types.rs index e651a7083..12c634a2b 100644 --- a/plane/src/types.rs +++ b/plane/src/types.rs @@ -1,8 +1,10 @@ use crate::{ - client::PlaneClient, log_types::BackendAddr, names::BackendName, util::random_prefixed_string, + client::PlaneClient, + log_types::{BackendAddr, LoggableTime}, + names::BackendName, + util::random_prefixed_string, }; use bollard::auth::DockerCredentials; -use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::{collections::HashMap, fmt::Display, net::SocketAddr, str::FromStr}; @@ -28,7 +30,7 @@ impl Display for NodeId { } } -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash, Eq)] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash, valuable::Valuable)] pub struct ClusterName(String); impl ClusterName { @@ -489,6 +491,5 @@ impl TryFrom for NodeKind { pub struct TimestampedBackendStatus { pub status: BackendStatus, - #[serde(with = "chrono::serde::ts_milliseconds")] - pub time: DateTime, + pub time: LoggableTime, } From bd601db6e2b6181db941b9166c58261706903f5e Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Sat, 20 Jan 2024 13:30:35 -0500 Subject: [PATCH 3/6] More structured logging --- plane/src/log_types.rs | 2 +- plane/src/protocol.rs | 2 +- plane/src/proxy/route_map.rs | 7 ++++++- plane/src/types.rs | 4 ++-- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/plane/src/log_types.rs b/plane/src/log_types.rs index ec6cfe5ea..a467095a6 100644 --- a/plane/src/log_types.rs +++ b/plane/src/log_types.rs @@ -35,7 +35,7 @@ impl From for LoggableTime { } } -#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq)] +#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq)] pub struct BackendAddr(pub SocketAddr); impl valuable::Valuable for BackendAddr { diff --git a/plane/src/protocol.rs b/plane/src/protocol.rs index 0d2ed8b60..a377c0f72 100644 --- a/plane/src/protocol.rs +++ b/plane/src/protocol.rs @@ -153,7 +153,7 @@ impl ChannelMessage for MessageToDrone { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct RouteInfo { pub backend_id: BackendName, - pub address: SocketAddr, + pub address: BackendAddr, pub secret_token: SecretToken, pub user: Option, pub user_data: Option, diff --git a/plane/src/proxy/route_map.rs b/plane/src/proxy/route_map.rs index 6196c6106..5cae06a66 100644 --- a/plane/src/proxy/route_map.rs +++ b/plane/src/proxy/route_map.rs @@ -9,6 +9,7 @@ use std::{ sync::{Mutex, RwLock}, }; use tokio::sync::broadcast::Sender; +use valuable::Valuable; const CACHE_SIZE: usize = 1_000; @@ -94,7 +95,11 @@ impl RouteMap { } fn insert(&self, token: BearerToken, route_info: Option) { - tracing::info!(?token, ?route_info, "Inserting route info"); + tracing::info!( + token = token.as_value(), + ?route_info, + "Inserting route info" + ); self.routes .lock() .expect("Routes lock was poisoned.") diff --git a/plane/src/types.rs b/plane/src/types.rs index 12c634a2b..92436d25f 100644 --- a/plane/src/types.rs +++ b/plane/src/types.rs @@ -384,7 +384,7 @@ pub struct ConnectRequest { pub auth: Map, } -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash, valuable::Valuable)] pub struct BearerToken(String); impl From for BearerToken { @@ -399,7 +399,7 @@ impl Display for BearerToken { } } -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, valuable::Valuable)] pub struct SecretToken(String); impl From for SecretToken { From 56613028aca738a85c7432db2f1d37878ba0938b Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Sat, 20 Jan 2024 13:30:57 -0500 Subject: [PATCH 4/6] version bump 0.4.2 --- Cargo.lock | 2 +- plane/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fbd63bbb6..cf785d731 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1624,7 +1624,7 @@ checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" [[package]] name = "plane" -version = "0.4.1" +version = "0.4.2" dependencies = [ "acme2-eab", "anyhow", diff --git a/plane/Cargo.toml b/plane/Cargo.toml index f0b6f8977..3ee374413 100644 --- a/plane/Cargo.toml +++ b/plane/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "plane" -version = "0.4.1" +version = "0.4.2" edition = "2021" default-run = "plane" description = "Session backend orchestrator for ambitious browser-based apps." From 640bf5c7ed8aa6c847972c4b230a012c8dbb5d87 Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Sat, 20 Jan 2024 13:31:59 -0500 Subject: [PATCH 5/6] unbreak build --- plane/src/database/backend.rs | 2 +- plane/src/protocol.rs | 1 - plane/src/proxy/proxy_service.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/plane/src/database/backend.rs b/plane/src/database/backend.rs index 3c49b7232..fc6a7bfa8 100644 --- a/plane/src/database/backend.rs +++ b/plane/src/database/backend.rs @@ -282,7 +282,7 @@ impl<'a> BackendDatabase<'a> { Ok(Some(RouteInfo { backend_id: BackendName::try_from(result.backend_id) .map_err(|_| sqlx::Error::Decode("Failed to decode backend name.".into()))?, - address, + address: BackendAddr(address), secret_token: SecretToken::from(result.secret_token), user: result.username, user_data: Some(result.auth), diff --git a/plane/src/protocol.rs b/plane/src/protocol.rs index a377c0f72..f90d7b912 100644 --- a/plane/src/protocol.rs +++ b/plane/src/protocol.rs @@ -9,7 +9,6 @@ use crate::{ }, }; use serde::{Deserialize, Serialize}; -use std::net::SocketAddr; #[derive(Serialize, Deserialize, Debug, Clone, valuable::Valuable)] pub struct KeyDeadlines { diff --git a/plane/src/proxy/proxy_service.rs b/plane/src/proxy/proxy_service.rs index 6179975f3..eaa6428e4 100644 --- a/plane/src/proxy/proxy_service.rs +++ b/plane/src/proxy/proxy_service.rs @@ -129,7 +129,7 @@ impl RequestHandler { }; let backend_id = route_info.backend_id.clone(); - request_rewriter.set_authority(route_info.address); + request_rewriter.set_authority(route_info.address.0); let mut response = if request_rewriter.should_upgrade() { let (req, req_clone) = request_rewriter.into_request_pair(&route_info); From 313204df2142234aa280385fbee09d4cc9aa59d8 Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Mon, 22 Jan 2024 12:52:37 -0500 Subject: [PATCH 6/6] remove extra newline --- plane/src/database/backend.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/plane/src/database/backend.rs b/plane/src/database/backend.rs index fc6a7bfa8..d77340d3c 100644 --- a/plane/src/database/backend.rs +++ b/plane/src/database/backend.rs @@ -355,7 +355,6 @@ impl<'a> BackendDatabase<'a> { #[derive(Debug, Clone)] pub struct TerminationCandidate { pub backend_id: BackendName, - pub expiration_time: Option>, pub last_keepalive: DateTime, pub allowed_idle_seconds: Option,