Skip to content

Commit

Permalink
Initial structured logging support
Browse files Browse the repository at this point in the history
  • Loading branch information
paulgb committed Jan 20, 2024
1 parent 90eefde commit 4df2b87
Show file tree
Hide file tree
Showing 21 changed files with 254 additions and 61 deletions.
4 changes: 4 additions & 0 deletions .cargo/config
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# See: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/index.html#unstable-features

[build]
rustflags = ["--cfg", "tracing_unstable"]
42 changes: 42 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion dev/controller.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#!/bin/sh

cargo run -- controller --db postgres://postgres@localhost "$@" --default-cluster localhost:9090
PLANE_LOG_JSON=true \
cargo run -- \
controller \
--db postgres://postgres@localhost \
--default-cluster localhost:9090 \
"$@"
8 changes: 7 additions & 1 deletion dev/drone.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
#!/bin/sh

cargo run -- drone --controller-url ws://localhost:8080/ --cluster "localhost:9090" --db "drone.sqlite" "$@"
PLANE_LOG_JSON=true \
cargo run -- \
drone \
--controller-url ws://localhost:8080/ \
--cluster "localhost:9090" \
--db "drone.sqlite" \
"$@"
7 changes: 6 additions & 1 deletion dev/proxy.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#!/bin/sh

cargo run -- proxy --controller-url ws://localhost:8080/ --cluster "localhost:9090" "$@"
PLANE_LOG_JSON=true \
cargo run -- \
proxy \
--controller-url ws://localhost:8080/ \
--cluster "localhost:9090" \
"$@"
3 changes: 2 additions & 1 deletion plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ tokio-stream = "0.1.14"
tokio-tungstenite = { version = "0.20.1", features = ["rustls-tls-webpki-roots"] }
tower-http = { version = "0.4.4", features = ["trace", "cors"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json", "valuable"] }
trust-dns-server = "0.23.2"
tungstenite = "0.20.1"
url = "2.4.1"
x509-parser = "0.15.1"
valuable = { version = "0.1.0", features = ["derive"] }
11 changes: 8 additions & 3 deletions plane/src/controller/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
subscribe::Subscription,
PlaneDatabase,
},
log_types::LoggableTime,
protocol::{
BackendAction, Heartbeat, KeyDeadlines, MessageFromDrone, MessageToDrone, RenewKeyResponse,
},
Expand Down Expand Up @@ -77,9 +78,13 @@ pub async fn handle_message_from_drone(
.await?;

let deadlines = KeyDeadlines {
renew_at: renew_key_request.local_time + KEY_LEASE_RENEW_AFTER,
soft_terminate_at: renew_key_request.local_time + KEY_LEASE_SOFT_TERMINATE_AFTER,
hard_terminate_at: renew_key_request.local_time + KEY_LEASE_HARD_TERMINATE_AFTER,
renew_at: LoggableTime(renew_key_request.local_time + KEY_LEASE_RENEW_AFTER),
soft_terminate_at: LoggableTime(
renew_key_request.local_time + KEY_LEASE_SOFT_TERMINATE_AFTER,
),
hard_terminate_at: LoggableTime(
renew_key_request.local_time + KEY_LEASE_HARD_TERMINATE_AFTER,
),
};

let renew_key_response = RenewKeyResponse {
Expand Down
11 changes: 8 additions & 3 deletions plane/src/database/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
backend_key::{KeysDatabase, KEY_LEASE_EXPIRATION},
drone::DroneDatabase,
},
log_types::LoggableTime,
names::{BackendName, Name},
protocol::{AcquiredKey, BackendAction, KeyDeadlines},
types::{
Expand Down Expand Up @@ -129,9 +130,13 @@ async fn create_backend_with_key(
let acquired_key = AcquiredKey {
key: key.clone(),
deadlines: KeyDeadlines {
renew_at: drone_for_spawn.last_local_time + KEY_LEASE_RENEW_AFTER,
soft_terminate_at: drone_for_spawn.last_local_time + KEY_LEASE_SOFT_TERMINATE_AFTER,
hard_terminate_at: drone_for_spawn.last_local_time + KEY_LEASE_SOFT_TERMINATE_AFTER,
renew_at: LoggableTime(drone_for_spawn.last_local_time + KEY_LEASE_RENEW_AFTER),
soft_terminate_at: LoggableTime(
drone_for_spawn.last_local_time + KEY_LEASE_SOFT_TERMINATE_AFTER,
),
hard_terminate_at: LoggableTime(
drone_for_spawn.last_local_time + KEY_LEASE_SOFT_TERMINATE_AFTER,
),
},
token: result.fencing_token,
};
Expand Down
2 changes: 1 addition & 1 deletion plane/src/database/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl EventSubscriptionManager {

let mut listeners = self.listeners.write().expect("Listener map is poisoned.");
let key = (kind.clone(), key.map(|s| s.to_string()));
tracing::info!(?key, "Subscribing to event");
tracing::info!(kind = key.0, key = key.1, "Subscribing to event");

match listeners.entry(key.clone()) {
std::collections::hash_map::Entry::Occupied(entry) => {
Expand Down
15 changes: 10 additions & 5 deletions plane/src/drone/backend_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::{
net::IpAddr,
sync::{Arc, Mutex, RwLock},
};
use valuable::Valuable;

/// The backend manager uses a state machine internally to manage the state of the backend.
/// Each time we enter a state, we can do one of three things:
Expand Down Expand Up @@ -233,7 +234,7 @@ impl BackendManager {
})
}
BackendStatus::Waiting => StepStatusResult::future_status(async move {
let address = match state.address {
let address = match state.address() {
Some(address) => address,
None => {
tracing::error!("State is waiting, but no associated address.");
Expand Down Expand Up @@ -278,19 +279,23 @@ impl BackendManager {
}
}

pub fn set_state(self: &Arc<Self>, status: BackendState) {
tracing::info!(?self.backend_id, ?status, "Updating backend state");
pub fn set_state(self: &Arc<Self>, state: BackendState) {
tracing::info!(
backend_id = self.backend_id.as_value(),
state = state.as_value(),
"Updating backend state"
);
let mut handle = self.handle.lock().expect("Guard handle lock is poisoned");
// Cancel any existing task.
handle.take();

// Call the callback.
if let Err(err) = (self.state_callback)(&status) {
if let Err(err) = (self.state_callback)(&state) {
tracing::error!(?err, "Error calling state callback.");
return;
}

let result = self.step_state(status);
let result = self.step_state(state);
match result {
StepStatusResult::DoNothing => {}
StepStatusResult::SetState(status) => {
Expand Down
2 changes: 1 addition & 1 deletion plane/src/drone/docker/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ fn get_container_config_from_executor_config(
.resource_limits
.cpu_time_limit
.map(|cpu_time_limit| {
let Ok(secs) = cpu_time_limit.as_secs().try_into() else {
let Ok(secs) = cpu_time_limit.0.as_secs().try_into() else {
tracing::warn!(
"unable to convert cpu_time_limit: {:?} to i64 for use in ulimit",
cpu_time_limit
Expand Down
15 changes: 8 additions & 7 deletions plane/src/drone/key_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
time::{Duration, SystemTime},
};
use tokio::time::sleep;
use valuable::Valuable;

pub struct KeyManager {
executor: Arc<Executor>,
Expand All @@ -33,7 +34,7 @@ async fn renew_key_loop(
loop {
let now = SystemTime::now();
let deadlines = &key.deadlines;
if now >= deadlines.hard_terminate_at {
if now >= deadlines.hard_terminate_at.0 {
tracing::warn!("Key {:?} has expired, hard-terminating.", key.key);
if let Err(err) = executor
.apply_action(
Expand All @@ -51,7 +52,7 @@ async fn renew_key_loop(
break;
}

if now >= deadlines.soft_terminate_at {
if now >= deadlines.soft_terminate_at.0 {
tracing::warn!("Key {:?} has expired, soft-terminating.", key.key);
if let Err(err) = executor
.apply_action(
Expand All @@ -67,15 +68,15 @@ async fn renew_key_loop(
continue;
}

if let Ok(time_to_sleep) = deadlines.hard_terminate_at.duration_since(now) {
if let Ok(time_to_sleep) = deadlines.hard_terminate_at.0.duration_since(now) {
sleep(time_to_sleep).await;
}

continue;
}

if now >= deadlines.renew_at {
tracing::info!("Renewing key {:?}.", key.key);
if now >= deadlines.renew_at.0 {
tracing::info!(key = key.key.as_value(), "Renewing key.");

if let Some(ref sender) = sender {
let request = RenewKeyRequest {
Expand All @@ -88,13 +89,13 @@ async fn renew_key_loop(
}
}

if let Ok(time_to_sleep) = deadlines.soft_terminate_at.duration_since(now) {
if let Ok(time_to_sleep) = deadlines.soft_terminate_at.0.duration_since(now) {
sleep(time_to_sleep).await;
}
continue;
}

if let Ok(time_to_sleep) = deadlines.renew_at.duration_since(now) {
if let Ok(time_to_sleep) = deadlines.renew_at.0.duration_since(now) {
sleep(time_to_sleep).await;
}
}
Expand Down
14 changes: 11 additions & 3 deletions plane/src/drone/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
sync::{Arc, Mutex},
};
use tokio::task::JoinHandle;
use valuable::Valuable;

mod backend_manager;
pub mod docker;
Expand Down Expand Up @@ -87,7 +88,11 @@ pub async fn drone_loop(
action,
..
}) => {
tracing::info!(?backend_id, ?action, "Received action.");
tracing::info!(
backend_id = backend_id.as_value(),
action = action.as_value(),
"Received action."
);

if let BackendAction::Spawn { key, .. } = &action {
// Register the key with the key manager, ensuring that it will be refreshed.
Expand All @@ -108,14 +113,17 @@ pub async fn drone_loop(
}
}
MessageToDrone::AckEvent { event_id } => {
tracing::info!(?event_id, "Received status ack.");
tracing::info!(event_id = event_id.as_value(), "Received status ack.");
if let Err(err) = executor.ack_event(event_id) {
tracing::error!(?err, "Error acking event.");
}
}
MessageToDrone::RenewKeyResponse(renew_key_response) => {
let RenewKeyResponse { backend, deadlines } = renew_key_response;
tracing::info!(?deadlines, "Received key renewal response.");
tracing::info!(
deadlines = deadlines.as_value(),
"Received key renewal response."
);

if let Some(deadlines) = deadlines {
key_manager
Expand Down
4 changes: 2 additions & 2 deletions plane/src/drone/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl StateStore {
event_id,
backend_id: backend_id.clone(),
status: state.status,
address: state.address,
address: state.address(),
exit_code: state.exit_code,
timestamp,
};
Expand Down Expand Up @@ -170,7 +170,7 @@ impl StateStore {
event_id: BackendEventId::from(event_id),
backend_id: BackendName::try_from(backend_id)?,
status: state.status,
address: state.address,
address: state.address(),
exit_code: state.exit_code,
timestamp: DateTime::UNIX_EPOCH + chrono::Duration::milliseconds(timestamp),
};
Expand Down
24 changes: 19 additions & 5 deletions plane/src/init_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,24 @@ pub fn init_tracing() {
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();

let layer = tracing_subscriber::fmt::layer().with_span_events(FmtSpan::FULL);
// Use JSON if PLANE_LOG_JSON is set to anything other than "false".
let use_json = std::env::var("PLANE_LOG_JSON")
.map(|s| s != "false")
.unwrap_or_default();

tracing_subscriber::registry()
.with(layer)
.with(filter)
.init();
if use_json {
let layer = tracing_subscriber::fmt::layer().json();
tracing_subscriber::registry()
.with(layer)
.with(filter)
.init();
return;

Check warning on line 22 in plane/src/init_tracing.rs

View workflow job for this annotation

GitHub Actions / clippy

unneeded `return` statement

warning: unneeded `return` statement --> plane/src/init_tracing.rs:21:21 | 21 | .init(); | _____________________^ 22 | | return; | |______________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return = note: `#[warn(clippy::needless_return)]` on by default help: remove `return` | 21 - .init(); 22 - return; 21 + .init(); |
} else {
let layer = tracing_subscriber::fmt::layer().with_span_events(FmtSpan::FULL);

tracing_subscriber::registry()
.with(layer)
.with(filter)
.init();
}
}
1 change: 1 addition & 0 deletions plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod dns;
pub mod drone;
pub mod heartbeat_consts;
pub mod init_tracing;
pub mod log_types;
pub mod names;
pub mod protocol;
pub mod proxy;
Expand Down
Loading

0 comments on commit 4df2b87

Please sign in to comment.