diff --git a/plane/plane-tests/tests/backend_actions.rs b/plane/plane-tests/tests/backend_actions.rs index e8e5d52f7..ce1156e8c 100644 --- a/plane/plane-tests/tests/backend_actions.rs +++ b/plane/plane-tests/tests/backend_actions.rs @@ -4,6 +4,7 @@ use hyper::StatusCode; use plane::{ client::PlaneClientError, database::backend::BackendActionMessage, + log_types::LoggableTime, names::{DroneName, Name}, protocol::{BackendAction, Heartbeat, MessageFromDrone, MessageToDrone}, types::{ClusterName, ConnectRequest, ConnectResponse, ExecutorConfig, SpawnConfig}, @@ -59,7 +60,7 @@ async fn backend_action_resent_if_not_acked(env: TestEnvironment) { tracing::info!("Sending initial heartbeat message (mocking the drone)."); drone_connection .send(MessageFromDrone::Heartbeat(Heartbeat { - local_time: Utc::now(), + local_time: LoggableTime(Utc::now()), })) .await .unwrap(); diff --git a/plane/src/admin.rs b/plane/src/admin.rs index 24d0b6514..0c08a6f2c 100644 --- a/plane/src/admin.rs +++ b/plane/src/admin.rs @@ -174,7 +174,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE println!( "Status: {} at {}", status.status.to_string().magenta(), - status.time.to_string().bright_cyan() + status.time.0.to_string().bright_cyan() ); if status.status >= BackendStatus::Ready { @@ -206,7 +206,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE println!( "Status: {} at {}", status.status.to_string().magenta(), - status.time.to_string().bright_cyan() + status.time.0.to_string().bright_cyan() ); if status.status >= BackendStatus::Terminated { diff --git a/plane/src/controller/backend_status.rs b/plane/src/controller/backend_status.rs index eb4c81c56..ce9529b08 100644 --- a/plane/src/controller/backend_status.rs +++ b/plane/src/controller/backend_status.rs @@ -1,5 +1,6 @@ use super::{core::Controller, error::IntoApiError}; use crate::{ + log_types::LoggableTime, names::BackendName, types::{BackendStatus, TimestampedBackendStatus}, }; @@ -29,7 +30,7 @@ async fn backend_status( let result = TimestampedBackendStatus { status: backend.last_status, - time: backend.last_status_time, + time: LoggableTime(backend.last_status_time), }; Ok(result) diff --git a/plane/src/controller/dns.rs b/plane/src/controller/dns.rs index c064015b6..592338315 100644 --- a/plane/src/controller/dns.rs +++ b/plane/src/controller/dns.rs @@ -8,6 +8,7 @@ use axum::{ response::IntoResponse, }; use std::net::{IpAddr, SocketAddr}; +use valuable::Valuable; pub async fn dns_socket_inner( ws: WebSocket, @@ -21,7 +22,10 @@ pub async fn dns_socket_inner( loop { let message_from_dns_result = socket.recv().await; - tracing::info!(?message_from_dns_result, "Handling message from DNS..."); + tracing::info!( + v = message_from_dns_result.as_value(), + "Handling message from DNS..." + ); match message_from_dns_result { Some(MessageFromDns::TxtRecordRequest { cluster }) => { let txt_value = match controller.db.acme().txt_record_for_cluster(&cluster).await { diff --git a/plane/src/controller/drone.rs b/plane/src/controller/drone.rs index f2239a092..b27cfda4a 100644 --- a/plane/src/controller/drone.rs +++ b/plane/src/controller/drone.rs @@ -22,6 +22,7 @@ use axum::{ response::{IntoResponse, Response}, }; use std::net::{IpAddr, SocketAddr}; +use valuable::Valuable; pub async fn handle_message_from_drone( msg: MessageFromDrone, @@ -37,14 +38,11 @@ pub async fn handle_message_from_drone( controller .db .drone() - .heartbeat(drone_id, local_time) + .heartbeat(drone_id, local_time.0) .await?; } MessageFromDrone::BackendEvent(backend_event) => { - tracing::info!( - event = ?backend_event, - "Received backend event" - ); + tracing::info!(event = backend_event.as_value(), "Received backend event"); controller .db @@ -78,12 +76,12 @@ pub async fn handle_message_from_drone( .await?; let deadlines = KeyDeadlines { - renew_at: LoggableTime(renew_key_request.local_time + KEY_LEASE_RENEW_AFTER), + renew_at: LoggableTime(renew_key_request.local_time.0 + KEY_LEASE_RENEW_AFTER), soft_terminate_at: LoggableTime( - renew_key_request.local_time + KEY_LEASE_SOFT_TERMINATE_AFTER, + renew_key_request.local_time.0 + KEY_LEASE_SOFT_TERMINATE_AFTER, ), hard_terminate_at: LoggableTime( - renew_key_request.local_time + KEY_LEASE_HARD_TERMINATE_AFTER, + renew_key_request.local_time.0 + KEY_LEASE_HARD_TERMINATE_AFTER, ), }; diff --git a/plane/src/controller/proxy.rs b/plane/src/controller/proxy.rs index 688e3b87d..8a5a3ddf2 100644 --- a/plane/src/controller/proxy.rs +++ b/plane/src/controller/proxy.rs @@ -14,6 +14,7 @@ use axum::{ response::{IntoResponse, Response}, }; use std::net::{IpAddr, SocketAddr}; +use valuable::Valuable; pub async fn proxy_socket_inner( cluster: ClusterName, @@ -97,7 +98,10 @@ pub async fn proxy_socket_inner( } }; - tracing::info!(?response, "Sending cert manager response"); + tracing::info!( + response = response.as_value(), + "Sending cert manager response" + ); if let Err(err) = socket .send(MessageToProxy::CertManagerResponse(response)) diff --git a/plane/src/database/backend.rs b/plane/src/database/backend.rs index 9169d8bcd..3c49b7232 100644 --- a/plane/src/database/backend.rs +++ b/plane/src/database/backend.rs @@ -1,5 +1,6 @@ use super::{subscribe::emit_with_key, util::MapSqlxError, PlaneDatabase}; use crate::{ + log_types::{BackendAddr, LoggableTime}, names::{BackendActionName, BackendName}, protocol::{BackendAction, RouteInfo}, types::{BackendStatus, BearerToken, NodeId, SecretToken, TimestampedBackendStatus}, @@ -68,7 +69,7 @@ impl<'a> BackendDatabase<'a> { match status { Ok(status) => { yield TimestampedBackendStatus { - time: row.created_at, + time: LoggableTime(row.created_at), status, }; last_status = Some(status); @@ -95,7 +96,7 @@ impl<'a> BackendDatabase<'a> { let item = TimestampedBackendStatus { status, - time, + time: LoggableTime(time), }; yield item; @@ -148,7 +149,7 @@ impl<'a> BackendDatabase<'a> { &self, backend: &BackendName, status: BackendStatus, - address: Option, + address: Option, exit_code: Option, ) -> sqlx::Result<()> { let mut txn = self.db.pool.begin().await?; @@ -167,7 +168,7 @@ impl<'a> BackendDatabase<'a> { "#, backend.to_string(), status.to_string(), - address.map(|a| a.to_string()), + address.map(|a| a.0.to_string()), exit_code, ) .execute(&mut *txn) @@ -351,9 +352,10 @@ impl<'a> BackendDatabase<'a> { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] pub struct TerminationCandidate { pub backend_id: BackendName, + pub expiration_time: Option>, pub last_keepalive: DateTime, pub allowed_idle_seconds: Option, diff --git a/plane/src/database/drone.rs b/plane/src/database/drone.rs index e0447a573..98c46dd36 100644 --- a/plane/src/database/drone.rs +++ b/plane/src/database/drone.rs @@ -4,7 +4,7 @@ use crate::{ }; use chrono::{DateTime, Utc}; use sqlx::{postgres::types::PgInterval, query, query_as, PgPool}; -use std::time::{Duration, SystemTime}; +use std::time::Duration; pub struct DroneDatabase<'a> { pool: &'a PgPool, @@ -114,5 +114,5 @@ impl<'a> DroneDatabase<'a> { pub struct DroneForSpawn { pub id: NodeId, - pub last_local_time: SystemTime, + pub last_local_time: DateTime, } diff --git a/plane/src/drone/docker/mod.rs b/plane/src/drone/docker/mod.rs index 5e83f263e..1c5e6bacf 100644 --- a/plane/src/drone/docker/mod.rs +++ b/plane/src/drone/docker/mod.rs @@ -72,7 +72,7 @@ impl PlaneDocker { Ok(e) => e, }; - tracing::info!("Received event: {:?}", e); + tracing::info!(event=?e, "Received event"); let Some(actor) = e.actor else { tracing::warn!("Received event without actor."); @@ -101,7 +101,7 @@ impl PlaneDocker { } }; - tracing::info!("Received exit code: {:?}", exit_code); + tracing::info!(exit_code, "Received exit code"); Some(TerminateEvent { backend_id, diff --git a/plane/src/drone/heartbeat.rs b/plane/src/drone/heartbeat.rs index f190aa433..eb7d10199 100644 --- a/plane/src/drone/heartbeat.rs +++ b/plane/src/drone/heartbeat.rs @@ -1,5 +1,6 @@ use crate::{ - heartbeat_consts::HEARTBEAT_INTERVAL, protocol::Heartbeat, typed_socket::TypedSocketSender, + heartbeat_consts::HEARTBEAT_INTERVAL, log_types::LoggableTime, protocol::Heartbeat, + typed_socket::TypedSocketSender, }; use chrono::Utc; use tokio::task::JoinHandle; @@ -13,7 +14,7 @@ impl HeartbeatLoop { pub fn start(sender: TypedSocketSender) -> Self { let handle = tokio::spawn(async move { loop { - let local_time = Utc::now(); + let local_time = LoggableTime(Utc::now()); if let Err(err) = sender.send(Heartbeat { local_time }) { tracing::error!(?err, "failed to send heartbeat"); } diff --git a/plane/src/drone/key_manager.rs b/plane/src/drone/key_manager.rs index e5aaa477a..b9abe5bd5 100644 --- a/plane/src/drone/key_manager.rs +++ b/plane/src/drone/key_manager.rs @@ -1,16 +1,14 @@ use super::executor::Executor; use crate::{ + log_types::LoggableTime, names::BackendName, protocol::{AcquiredKey, BackendAction, KeyDeadlines, RenewKeyRequest}, typed_socket::TypedSocketSender, types::TerminationKind, util::GuardHandle, }; -use std::{ - collections::HashMap, - sync::Arc, - time::{Duration, SystemTime}, -}; +use chrono::Utc; +use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::time::sleep; use valuable::Valuable; @@ -32,7 +30,7 @@ async fn renew_key_loop( executor: Arc, ) { loop { - let now = SystemTime::now(); + let now = Utc::now(); let deadlines = &key.deadlines; if now >= deadlines.hard_terminate_at.0 { tracing::warn!("Key {:?} has expired, hard-terminating.", key.key); @@ -68,7 +66,12 @@ async fn renew_key_loop( continue; } - if let Ok(time_to_sleep) = deadlines.hard_terminate_at.0.duration_since(now) { + if let Ok(time_to_sleep) = deadlines + .hard_terminate_at + .0 + .signed_duration_since(now) + .to_std() + { sleep(time_to_sleep).await; } @@ -81,7 +84,7 @@ async fn renew_key_loop( if let Some(ref sender) = sender { let request = RenewKeyRequest { backend: backend.clone(), - local_time: SystemTime::now(), + local_time: LoggableTime(Utc::now()), }; if let Err(err) = sender.send(request) { @@ -89,13 +92,18 @@ async fn renew_key_loop( } } - if let Ok(time_to_sleep) = deadlines.soft_terminate_at.0.duration_since(now) { + if let Ok(time_to_sleep) = deadlines + .soft_terminate_at + .0 + .signed_duration_since(now) + .to_std() + { sleep(time_to_sleep).await; } continue; } - if let Ok(time_to_sleep) = deadlines.renew_at.0.duration_since(now) { + if let Ok(time_to_sleep) = deadlines.renew_at.0.signed_duration_since(now).to_std() { sleep(time_to_sleep).await; } } diff --git a/plane/src/drone/state_store.rs b/plane/src/drone/state_store.rs index f73db37f9..9ba18574c 100644 --- a/plane/src/drone/state_store.rs +++ b/plane/src/drone/state_store.rs @@ -1,4 +1,5 @@ use crate::{ + log_types::LoggableTime, names::BackendName, protocol::{BackendEventId, BackendMetricsMessage, BackendStateMessage}, typed_socket::TypedSocketSender, @@ -106,9 +107,9 @@ impl StateStore { event_id, backend_id: backend_id.clone(), status: state.status, - address: state.address(), + address: state.address, exit_code: state.exit_code, - timestamp, + timestamp: LoggableTime(timestamp), }; listener(event_message); @@ -170,9 +171,11 @@ impl StateStore { event_id: BackendEventId::from(event_id), backend_id: BackendName::try_from(backend_id)?, status: state.status, - address: state.address(), + address: state.address, exit_code: state.exit_code, - timestamp: DateTime::UNIX_EPOCH + chrono::Duration::milliseconds(timestamp), + timestamp: LoggableTime( + DateTime::UNIX_EPOCH + chrono::Duration::milliseconds(timestamp), + ), }; result.push(event); diff --git a/plane/src/init_tracing.rs b/plane/src/init_tracing.rs index ccdb2b1b3..727eee53c 100644 --- a/plane/src/init_tracing.rs +++ b/plane/src/init_tracing.rs @@ -19,7 +19,6 @@ pub fn init_tracing() { .with(layer) .with(filter) .init(); - return; } else { let layer = tracing_subscriber::fmt::layer().with_span_events(FmtSpan::FULL); diff --git a/plane/src/log_types.rs b/plane/src/log_types.rs index ffa75433a..ec6cfe5ea 100644 --- a/plane/src/log_types.rs +++ b/plane/src/log_types.rs @@ -1,11 +1,13 @@ +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::{net::SocketAddr, time::SystemTime}; +use time::OffsetDateTime; use valuable::{Tuplable, TupleDef, Valuable, Value, Visit}; // See: https://github.com/tokio-rs/valuable/issues/86#issuecomment-1760446976 #[derive(Serialize, Deserialize, Clone, Debug)] -pub struct LoggableTime(pub SystemTime); +pub struct LoggableTime(#[serde(with = "chrono::serde::ts_milliseconds")] pub DateTime); impl Valuable for LoggableTime { fn as_value(&self) -> Value<'_> { @@ -13,7 +15,7 @@ impl Valuable for LoggableTime { } fn visit(&self, visit: &mut dyn Visit) { - let s: String = format!("{:?}", self.0); + let s: String = format!("{}", self.0); let val = Value::String(s.as_str()); visit.visit_unnamed_fields(&[val]); } @@ -25,7 +27,15 @@ impl Tuplable for LoggableTime { } } -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] +impl From for LoggableTime { + fn from(offset: OffsetDateTime) -> Self { + let t: SystemTime = offset.into(); + let dt: DateTime = t.into(); + Self(dt) + } +} + +#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq)] pub struct BackendAddr(pub SocketAddr); impl valuable::Valuable for BackendAddr { diff --git a/plane/src/protocol.rs b/plane/src/protocol.rs index c7bb05914..0d2ed8b60 100644 --- a/plane/src/protocol.rs +++ b/plane/src/protocol.rs @@ -1,6 +1,6 @@ use crate::{ database::backend::BackendActionMessage, - log_types::LoggableTime, + log_types::{BackendAddr, LoggableTime}, names::{BackendActionName, BackendName}, typed_socket::ChannelMessage, types::{ @@ -8,9 +8,8 @@ use crate::{ TerminationKind, }, }; -use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use std::{net::SocketAddr, time::SystemTime}; +use std::net::SocketAddr; #[derive(Serialize, Deserialize, Debug, Clone, valuable::Valuable)] pub struct KeyDeadlines { @@ -51,19 +50,19 @@ pub enum BackendAction { }, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, valuable::Valuable)] pub struct BackendStateMessage { pub event_id: BackendEventId, pub backend_id: BackendName, pub status: BackendStatus, #[serde(skip_serializing_if = "Option::is_none")] - pub address: Option, + pub address: Option, #[serde(skip_serializing_if = "Option::is_none")] pub exit_code: Option, - pub timestamp: DateTime, + pub timestamp: LoggableTime, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, valuable::Valuable)] @@ -85,12 +84,12 @@ impl From for i64 { pub struct RenewKeyRequest { pub backend: BackendName, - pub local_time: SystemTime, + pub local_time: LoggableTime, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Heartbeat { - pub local_time: DateTime, + pub local_time: LoggableTime, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -178,7 +177,7 @@ impl ChannelMessage for CertManagerRequest { type Reply = CertManagerResponse; } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, valuable::Valuable)] pub enum CertManagerResponse { /// Acknowledge a lease request and indicate whether it was accepted. CertLeaseResponse { accepted: bool }, @@ -231,7 +230,7 @@ impl ChannelMessage for MessageToProxy { type Reply = MessageFromProxy; } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, valuable::Valuable)] pub enum MessageFromDns { TxtRecordRequest { cluster: ClusterName }, } diff --git a/plane/src/proxy/cert_manager.rs b/plane/src/proxy/cert_manager.rs index 51af8e5c5..fe0916d7f 100644 --- a/plane/src/proxy/cert_manager.rs +++ b/plane/src/proxy/cert_manager.rs @@ -1,5 +1,6 @@ use super::{cert_pair::CertificatePair, AcmeConfig}; use crate::{ + log_types::LoggableTime, protocol::{CertManagerRequest, CertManagerResponse}, types::ClusterName, }; @@ -8,10 +9,12 @@ use acme2_eab::{ DirectoryBuilder, OrderBuilder, OrderStatus, }; use anyhow::{anyhow, Context, Result}; +use chrono::Utc; use std::{ + ops::Sub, path::{Path, PathBuf}, sync::{Arc, Mutex}, - time::{Duration, SystemTime}, + time::Duration, }; use tokio::sync::{ broadcast, @@ -21,6 +24,7 @@ use tokio_rustls::rustls::{ server::{ClientHello, ResolvesServerCert}, sign::CertifiedKey, }; +use valuable::Valuable; const DNS_01: &str = "dns-01"; @@ -209,21 +213,23 @@ async fn refresh_loop_step( let last_current_cert = send_cert.borrow().clone(); if let Some(current_cert) = last_current_cert { - let renewal_time = current_cert - .validity_end - .checked_sub(RENEWAL_WINDOW) - .expect("Cert validity date arithmetic failed."); - let time_until_renew = renewal_time.duration_since(SystemTime::now())?; + let renewal_time = LoggableTime(current_cert.validity_end.0.sub(RENEWAL_WINDOW)); + let time_until_renew = renewal_time.0.sub(Utc::now()); - if time_until_renew > Duration::ZERO { + if time_until_renew > chrono::Duration::zero() { tracing::info!( - "Certificate for {} is valid until {:?}. Renewal scheduled for {:?} ({:?} from now). Sleeping.", - current_cert.common_name, - current_cert.validity_end, - renewal_time, - time_until_renew + common_name = current_cert.common_name, + validity_end = current_cert.validity_end.as_value(), + renewal_time = renewal_time.as_value(), + days_until_renew = time_until_renew.num_days(), + "Obtained certificate.", ); - tokio::time::sleep(time_until_renew).await; + tokio::time::sleep( + time_until_renew + .to_std() + .expect("time_until_renew is always positive."), + ) + .await; return Ok(()); } } @@ -355,7 +361,10 @@ async fn get_certificate( return Err(anyhow!("Cert manager error.")); } }; - tracing::info!(?response, "Received response from cert manager."); + tracing::info!( + response = response.as_value(), + "Received response from cert manager." + ); match response { CertManagerResponse::SetTxtRecordResponse { accepted: true } => (), diff --git a/plane/src/proxy/cert_pair.rs b/plane/src/proxy/cert_pair.rs index 7cc27e4f1..0066a8d40 100644 --- a/plane/src/proxy/cert_pair.rs +++ b/plane/src/proxy/cert_pair.rs @@ -1,8 +1,9 @@ +use crate::log_types::LoggableTime; use anyhow::{anyhow, Result}; use pem::Pem; use rustls_pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs1KeyDer}; use serde::{Deserialize, Serialize}; -use std::{fs::Permissions, io, os::unix::fs::PermissionsExt, path::Path, time::SystemTime}; +use std::{fs::Permissions, io, os::unix::fs::PermissionsExt, path::Path}; use tokio_rustls::rustls::{ sign::{any_supported_type, CertifiedKey}, Certificate, PrivateKey, @@ -42,8 +43,8 @@ pub struct CertificatePair { pub certified_key: CertifiedKey, pub private_key_der: Vec, pub common_name: String, - pub validity_start: SystemTime, - pub validity_end: SystemTime, + pub validity_start: LoggableTime, + pub validity_end: LoggableTime, } impl CertificatePair { diff --git a/plane/src/proxy/proxy_connection.rs b/plane/src/proxy/proxy_connection.rs index dcc1f9b72..7dadb0ab4 100644 --- a/plane/src/proxy/proxy_connection.rs +++ b/plane/src/proxy/proxy_connection.rs @@ -7,6 +7,7 @@ use crate::{ }; use std::sync::Arc; use tokio::task::JoinHandle; +use valuable::Valuable; pub struct ProxyConnection { handle: JoinHandle<()>, @@ -58,7 +59,10 @@ impl ProxyConnection { state.route_map.receive(response); } MessageToProxy::CertManagerResponse(response) => { - tracing::info!("Received cert manager response: {:?}", response); + tracing::info!( + response = response.as_value(), + "Received cert manager response" + ); cert_manager.receive(response); } } diff --git a/plane/src/types.rs b/plane/src/types.rs index e651a7083..12c634a2b 100644 --- a/plane/src/types.rs +++ b/plane/src/types.rs @@ -1,8 +1,10 @@ use crate::{ - client::PlaneClient, log_types::BackendAddr, names::BackendName, util::random_prefixed_string, + client::PlaneClient, + log_types::{BackendAddr, LoggableTime}, + names::BackendName, + util::random_prefixed_string, }; use bollard::auth::DockerCredentials; -use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::{collections::HashMap, fmt::Display, net::SocketAddr, str::FromStr}; @@ -28,7 +30,7 @@ impl Display for NodeId { } } -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash, Eq)] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash, valuable::Valuable)] pub struct ClusterName(String); impl ClusterName { @@ -489,6 +491,5 @@ impl TryFrom for NodeKind { pub struct TimestampedBackendStatus { pub status: BackendStatus, - #[serde(with = "chrono::serde::ts_milliseconds")] - pub time: DateTime, + pub time: LoggableTime, }