Skip to content

Commit

Permalink
add mechanism for passing metrics TypedSender to BackendManager
Browse files Browse the repository at this point in the history
sender is acquired at the start of the drone loop, so should always be valid
while the drone<->controller connection is active.
  • Loading branch information
Abhishek Cherath committed Jan 11, 2024
1 parent 0c423e0 commit b2917a2
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
5 changes: 4 additions & 1 deletion plane/src/drone/backend_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use super::{
};
use crate::{
names::BackendName,
protocol::BackendMetricsMessage,
typed_socket::TypedSocketSender,
types::{BackendState, BackendStatus, ExecutorConfig, PullPolicy, TerminationKind},
util::{ExponentialBackoff, GuardHandle},
};
Expand All @@ -14,7 +16,7 @@ use std::{error::Error, fmt::Debug};
use std::{future::pending, pin::Pin};
use std::{
net::IpAddr,
sync::{Arc, Mutex},
sync::{Arc, Mutex, RwLock},
};

/// The backend manager uses a state machine internally to manage the state of the backend.
Expand Down Expand Up @@ -84,6 +86,7 @@ impl BackendManager {
state: BackendState,
docker: PlaneDocker,
state_callback: impl Fn(&BackendState) -> Result<(), Box<dyn Error>> + Send + Sync + 'static,
_metrics_sender: Arc<RwLock<TypedSocketSender<BackendMetricsMessage>>>,
ip: IpAddr,
) -> Arc<Self> {
let container_id = ContainerId::from(format!("plane-{}", backend_id));
Expand Down
17 changes: 16 additions & 1 deletion plane/src/drone/executor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::{backend_manager::BackendManager, docker::PlaneDocker, state_store::StateStore};
use crate::{
names::BackendName,
protocol::{BackendAction, BackendEventId, BackendStateMessage},
protocol::{BackendAction, BackendEventId, BackendMetricsMessage, BackendStateMessage},
typed_socket::TypedSocketSender,
types::BackendState,
util::GuardHandle,
};
Expand Down Expand Up @@ -68,6 +69,13 @@ impl Executor {
.register_listener(listener)
}

pub fn register_metrics_sender(&self, sender: TypedSocketSender<BackendMetricsMessage>) {
self.state_store
.lock()
.expect("State store lock poisoned")
.register_metrics_sender(sender);
}

pub fn ack_event(&self, event_id: BackendEventId) -> Result<()> {
self.state_store
.lock()
Expand Down Expand Up @@ -96,12 +104,19 @@ impl Executor {
}
};

let metrics_sender = self
.state_store
.lock()
.expect("State store lock poisoned")
.get_metrics_sender()?;

let manager = BackendManager::new(
backend_id.clone(),
executable.as_ref().clone(),
BackendState::default(),
self.docker.clone(),
callback,
metrics_sender,
self.ip,
);
tracing::info!("Inserting backend {}.", backend_id);
Expand Down
3 changes: 3 additions & 0 deletions plane/src/drone/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub async fn drone_loop(
let mut socket = connection.connect_with_retry(&name).await;
let _heartbeat_guard = HeartbeatLoop::start(socket.sender(MessageFromDrone::Heartbeat));

let metrics_sender = socket.sender(MessageFromDrone::BackendMetrics);
executor.register_metrics_sender(metrics_sender);

key_manager
.lock()
.expect("Key manager lock poisoned")
Expand Down

0 comments on commit b2917a2

Please sign in to comment.