Skip to content

Commit

Permalink
More structured logging
Browse files Browse the repository at this point in the history
  • Loading branch information
paulgb committed Jan 20, 2024
1 parent 4df2b87 commit e9f2bbd
Show file tree
Hide file tree
Showing 19 changed files with 121 additions and 76 deletions.
3 changes: 2 additions & 1 deletion plane/plane-tests/tests/backend_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use hyper::StatusCode;
use plane::{
client::PlaneClientError,
database::backend::BackendActionMessage,
log_types::LoggableTime,
names::{DroneName, Name},
protocol::{BackendAction, Heartbeat, MessageFromDrone, MessageToDrone},
types::{ClusterName, ConnectRequest, ConnectResponse, ExecutorConfig, SpawnConfig},
Expand Down Expand Up @@ -59,7 +60,7 @@ async fn backend_action_resent_if_not_acked(env: TestEnvironment) {
tracing::info!("Sending initial heartbeat message (mocking the drone).");
drone_connection
.send(MessageFromDrone::Heartbeat(Heartbeat {
local_time: Utc::now(),
local_time: LoggableTime(Utc::now()),
}))
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions plane/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE
println!(
"Status: {} at {}",
status.status.to_string().magenta(),
status.time.to_string().bright_cyan()
status.time.0.to_string().bright_cyan()
);

if status.status >= BackendStatus::Ready {
Expand Down Expand Up @@ -206,7 +206,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE
println!(
"Status: {} at {}",
status.status.to_string().magenta(),
status.time.to_string().bright_cyan()
status.time.0.to_string().bright_cyan()
);

if status.status >= BackendStatus::Terminated {
Expand Down
3 changes: 2 additions & 1 deletion plane/src/controller/backend_status.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{core::Controller, error::IntoApiError};
use crate::{
log_types::LoggableTime,
names::BackendName,
types::{BackendStatus, TimestampedBackendStatus},
};
Expand Down Expand Up @@ -29,7 +30,7 @@ async fn backend_status(

let result = TimestampedBackendStatus {
status: backend.last_status,
time: backend.last_status_time,
time: LoggableTime(backend.last_status_time),
};

Ok(result)
Expand Down
6 changes: 5 additions & 1 deletion plane/src/controller/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use axum::{
response::IntoResponse,
};
use std::net::{IpAddr, SocketAddr};
use valuable::Valuable;

pub async fn dns_socket_inner(
ws: WebSocket,
Expand All @@ -21,7 +22,10 @@ pub async fn dns_socket_inner(

loop {
let message_from_dns_result = socket.recv().await;
tracing::info!(?message_from_dns_result, "Handling message from DNS...");
tracing::info!(
v = message_from_dns_result.as_value(),
"Handling message from DNS..."
);
match message_from_dns_result {
Some(MessageFromDns::TxtRecordRequest { cluster }) => {
let txt_value = match controller.db.acme().txt_record_for_cluster(&cluster).await {
Expand Down
14 changes: 6 additions & 8 deletions plane/src/controller/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use axum::{
response::{IntoResponse, Response},
};
use std::net::{IpAddr, SocketAddr};
use valuable::Valuable;

pub async fn handle_message_from_drone(
msg: MessageFromDrone,
Expand All @@ -37,14 +38,11 @@ pub async fn handle_message_from_drone(
controller
.db
.drone()
.heartbeat(drone_id, local_time)
.heartbeat(drone_id, local_time.0)
.await?;
}
MessageFromDrone::BackendEvent(backend_event) => {
tracing::info!(
event = ?backend_event,
"Received backend event"
);
tracing::info!(event = backend_event.as_value(), "Received backend event");

controller
.db
Expand Down Expand Up @@ -78,12 +76,12 @@ pub async fn handle_message_from_drone(
.await?;

let deadlines = KeyDeadlines {
renew_at: LoggableTime(renew_key_request.local_time + KEY_LEASE_RENEW_AFTER),
renew_at: LoggableTime(renew_key_request.local_time.0 + KEY_LEASE_RENEW_AFTER),
soft_terminate_at: LoggableTime(
renew_key_request.local_time + KEY_LEASE_SOFT_TERMINATE_AFTER,
renew_key_request.local_time.0 + KEY_LEASE_SOFT_TERMINATE_AFTER,
),
hard_terminate_at: LoggableTime(
renew_key_request.local_time + KEY_LEASE_HARD_TERMINATE_AFTER,
renew_key_request.local_time.0 + KEY_LEASE_HARD_TERMINATE_AFTER,
),
};

Expand Down
6 changes: 5 additions & 1 deletion plane/src/controller/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use axum::{
response::{IntoResponse, Response},
};
use std::net::{IpAddr, SocketAddr};
use valuable::Valuable;

pub async fn proxy_socket_inner(
cluster: ClusterName,
Expand Down Expand Up @@ -97,7 +98,10 @@ pub async fn proxy_socket_inner(
}
};

tracing::info!(?response, "Sending cert manager response");
tracing::info!(
response = response.as_value(),
"Sending cert manager response"
);

if let Err(err) = socket
.send(MessageToProxy::CertManagerResponse(response))
Expand Down
12 changes: 7 additions & 5 deletions plane/src/database/backend.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{subscribe::emit_with_key, util::MapSqlxError, PlaneDatabase};
use crate::{
log_types::{BackendAddr, LoggableTime},
names::{BackendActionName, BackendName},
protocol::{BackendAction, RouteInfo},
types::{BackendStatus, BearerToken, NodeId, SecretToken, TimestampedBackendStatus},
Expand Down Expand Up @@ -68,7 +69,7 @@ impl<'a> BackendDatabase<'a> {
match status {
Ok(status) => {
yield TimestampedBackendStatus {
time: row.created_at,
time: LoggableTime(row.created_at),
status,
};
last_status = Some(status);
Expand All @@ -95,7 +96,7 @@ impl<'a> BackendDatabase<'a> {

let item = TimestampedBackendStatus {
status,
time,
time: LoggableTime(time),
};

yield item;
Expand Down Expand Up @@ -148,7 +149,7 @@ impl<'a> BackendDatabase<'a> {
&self,
backend: &BackendName,
status: BackendStatus,
address: Option<SocketAddr>,
address: Option<BackendAddr>,
exit_code: Option<i32>,
) -> sqlx::Result<()> {
let mut txn = self.db.pool.begin().await?;
Expand All @@ -167,7 +168,7 @@ impl<'a> BackendDatabase<'a> {
"#,
backend.to_string(),
status.to_string(),
address.map(|a| a.to_string()),
address.map(|a| a.0.to_string()),
exit_code,
)
.execute(&mut *txn)
Expand Down Expand Up @@ -351,9 +352,10 @@ impl<'a> BackendDatabase<'a> {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub struct TerminationCandidate {
pub backend_id: BackendName,

pub expiration_time: Option<DateTime<Utc>>,
pub last_keepalive: DateTime<Utc>,
pub allowed_idle_seconds: Option<i32>,
Expand Down
4 changes: 2 additions & 2 deletions plane/src/database/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
};
use chrono::{DateTime, Utc};
use sqlx::{postgres::types::PgInterval, query, query_as, PgPool};
use std::time::{Duration, SystemTime};
use std::time::Duration;

pub struct DroneDatabase<'a> {
pool: &'a PgPool,
Expand Down Expand Up @@ -114,5 +114,5 @@ impl<'a> DroneDatabase<'a> {

pub struct DroneForSpawn {
pub id: NodeId,
pub last_local_time: SystemTime,
pub last_local_time: DateTime<Utc>,
}
4 changes: 2 additions & 2 deletions plane/src/drone/docker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl PlaneDocker {
Ok(e) => e,
};

tracing::info!("Received event: {:?}", e);
tracing::info!(event=?e, "Received event");

let Some(actor) = e.actor else {
tracing::warn!("Received event without actor.");
Expand Down Expand Up @@ -101,7 +101,7 @@ impl PlaneDocker {
}
};

tracing::info!("Received exit code: {:?}", exit_code);
tracing::info!(exit_code, "Received exit code");

Some(TerminateEvent {
backend_id,
Expand Down
5 changes: 3 additions & 2 deletions plane/src/drone/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
heartbeat_consts::HEARTBEAT_INTERVAL, protocol::Heartbeat, typed_socket::TypedSocketSender,
heartbeat_consts::HEARTBEAT_INTERVAL, log_types::LoggableTime, protocol::Heartbeat,
typed_socket::TypedSocketSender,
};
use chrono::Utc;
use tokio::task::JoinHandle;
Expand All @@ -13,7 +14,7 @@ impl HeartbeatLoop {
pub fn start(sender: TypedSocketSender<Heartbeat>) -> Self {
let handle = tokio::spawn(async move {
loop {
let local_time = Utc::now();
let local_time = LoggableTime(Utc::now());
if let Err(err) = sender.send(Heartbeat { local_time }) {
tracing::error!(?err, "failed to send heartbeat");
}
Expand Down
28 changes: 18 additions & 10 deletions plane/src/drone/key_manager.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use super::executor::Executor;
use crate::{
log_types::LoggableTime,
names::BackendName,
protocol::{AcquiredKey, BackendAction, KeyDeadlines, RenewKeyRequest},
typed_socket::TypedSocketSender,
types::TerminationKind,
util::GuardHandle,
};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, SystemTime},
};
use chrono::Utc;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::time::sleep;
use valuable::Valuable;

Expand All @@ -32,7 +30,7 @@ async fn renew_key_loop(
executor: Arc<Executor>,
) {
loop {
let now = SystemTime::now();
let now = Utc::now();
let deadlines = &key.deadlines;
if now >= deadlines.hard_terminate_at.0 {
tracing::warn!("Key {:?} has expired, hard-terminating.", key.key);
Expand Down Expand Up @@ -68,7 +66,12 @@ async fn renew_key_loop(
continue;
}

if let Ok(time_to_sleep) = deadlines.hard_terminate_at.0.duration_since(now) {
if let Ok(time_to_sleep) = deadlines
.hard_terminate_at
.0
.signed_duration_since(now)
.to_std()
{
sleep(time_to_sleep).await;
}

Expand All @@ -81,21 +84,26 @@ async fn renew_key_loop(
if let Some(ref sender) = sender {
let request = RenewKeyRequest {
backend: backend.clone(),
local_time: SystemTime::now(),
local_time: LoggableTime(Utc::now()),
};

if let Err(err) = sender.send(request) {
tracing::error!(%err, "Error sending renew key request.");
}
}

if let Ok(time_to_sleep) = deadlines.soft_terminate_at.0.duration_since(now) {
if let Ok(time_to_sleep) = deadlines
.soft_terminate_at
.0
.signed_duration_since(now)
.to_std()
{
sleep(time_to_sleep).await;
}
continue;
}

if let Ok(time_to_sleep) = deadlines.renew_at.0.duration_since(now) {
if let Ok(time_to_sleep) = deadlines.renew_at.0.signed_duration_since(now).to_std() {
sleep(time_to_sleep).await;
}
}
Expand Down
11 changes: 7 additions & 4 deletions plane/src/drone/state_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
log_types::LoggableTime,
names::BackendName,
protocol::{BackendEventId, BackendMetricsMessage, BackendStateMessage},
typed_socket::TypedSocketSender,
Expand Down Expand Up @@ -106,9 +107,9 @@ impl StateStore {
event_id,
backend_id: backend_id.clone(),
status: state.status,
address: state.address(),
address: state.address,
exit_code: state.exit_code,
timestamp,
timestamp: LoggableTime(timestamp),
};

listener(event_message);
Expand Down Expand Up @@ -170,9 +171,11 @@ impl StateStore {
event_id: BackendEventId::from(event_id),
backend_id: BackendName::try_from(backend_id)?,
status: state.status,
address: state.address(),
address: state.address,
exit_code: state.exit_code,
timestamp: DateTime::UNIX_EPOCH + chrono::Duration::milliseconds(timestamp),
timestamp: LoggableTime(
DateTime::UNIX_EPOCH + chrono::Duration::milliseconds(timestamp),
),
};

result.push(event);
Expand Down
1 change: 0 additions & 1 deletion plane/src/init_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub fn init_tracing() {
.with(layer)
.with(filter)
.init();
return;
} else {
let layer = tracing_subscriber::fmt::layer().with_span_events(FmtSpan::FULL);

Expand Down
16 changes: 13 additions & 3 deletions plane/src/log_types.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{net::SocketAddr, time::SystemTime};
use time::OffsetDateTime;
use valuable::{Tuplable, TupleDef, Valuable, Value, Visit};

// See: https://github.com/tokio-rs/valuable/issues/86#issuecomment-1760446976

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct LoggableTime(pub SystemTime);
pub struct LoggableTime(#[serde(with = "chrono::serde::ts_milliseconds")] pub DateTime<Utc>);

impl Valuable for LoggableTime {
fn as_value(&self) -> Value<'_> {
Value::Tuplable(self)
}

fn visit(&self, visit: &mut dyn Visit) {
let s: String = format!("{:?}", self.0);
let s: String = format!("{}", self.0);
let val = Value::String(s.as_str());
visit.visit_unnamed_fields(&[val]);
}
Expand All @@ -25,7 +27,15 @@ impl Tuplable for LoggableTime {
}
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
impl From<OffsetDateTime> for LoggableTime {
fn from(offset: OffsetDateTime) -> Self {
let t: SystemTime = offset.into();
let dt: DateTime<Utc> = t.into();
Self(dt)
}
}

#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq)]
pub struct BackendAddr(pub SocketAddr);

impl valuable::Valuable for BackendAddr {
Expand Down
Loading

0 comments on commit e9f2bbd

Please sign in to comment.