Skip to content

Commit

Permalink
StateManager will now cleanup actions on client disconnect
Browse files Browse the repository at this point in the history
StateManager will now properly remove items from the maps if the
client disconnects after a set amount of time. Currently these
values are hard codded, but will be easy to transition them to
use config variables once we design it out.
  • Loading branch information
allada committed Jul 8, 2024
1 parent 0d93671 commit b60e10f
Show file tree
Hide file tree
Showing 12 changed files with 384 additions and 121 deletions.
3 changes: 0 additions & 3 deletions nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ pub trait ActionScheduler: Sync + Send + Unpin {
client_operation_id: &ClientOperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error>;

/// Cleans up the cache of recently completed actions.
async fn clean_recently_completed_actions(&self);

/// Register the metrics for the action scheduler.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
}
2 changes: 0 additions & 2 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,4 @@ impl ActionScheduler for CacheLookupScheduler {
.find_by_client_operation_id(client_operation_id)
.await
}

async fn clean_recently_completed_actions(&self) {}
}
19 changes: 0 additions & 19 deletions nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@

use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use nativelink_config::schedulers::SchedulerConfig;
use nativelink_error::{Error, ResultExt};
use nativelink_store::store_manager::StoreManager;
use nativelink_util::background_spawn;
use nativelink_util::metrics_utils::Registry;
use tokio::time::interval;

use crate::action_scheduler::ActionScheduler;
use crate::cache_lookup_scheduler::CacheLookupScheduler;
Expand Down Expand Up @@ -88,7 +85,6 @@ fn inner_scheduler_factory(

if let Some(scheduler_metrics) = maybe_scheduler_metrics {
if let Some(action_scheduler) = &scheduler.0 {
start_cleanup_timer(action_scheduler);
// We need a way to prevent our scheduler form having `register_metrics()` called multiple times.
// This is the equivalent of grabbing a uintptr_t in C++, storing it in a set, and checking if it's
// already been visited. We can't use the Arc's pointer directly because it has two interfaces
Expand All @@ -115,18 +111,3 @@ fn inner_scheduler_factory(

Ok(scheduler)
}

fn start_cleanup_timer(action_scheduler: &Arc<dyn ActionScheduler>) {
let weak_scheduler = Arc::downgrade(action_scheduler);
background_spawn!("default_scheduler_factory_cleanup_timer", async move {
let mut ticker = interval(Duration::from_secs(10));
loop {
ticker.tick().await;
match weak_scheduler.upgrade() {
Some(scheduler) => scheduler.clean_recently_completed_actions().await,
// If we fail to upgrade, our service is probably destroyed, so return.
None => return,
}
}
});
}
2 changes: 0 additions & 2 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,4 @@ impl ActionScheduler for GrpcScheduler {
}
}
}

async fn clean_recently_completed_actions(&self) {}
}
4 changes: 0 additions & 4 deletions nativelink-scheduler/src/property_modifier_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ impl ActionScheduler for PropertyModifierScheduler {
.await
}

async fn clean_recently_completed_actions(&self) {
self.scheduler.clean_recently_completed_actions().await
}

// Register metrics for the underlying ActionScheduler.
fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
let scheduler_registry = registry.sub_registry_with_prefix("property_modifier");
Expand Down
21 changes: 16 additions & 5 deletions nativelink-scheduler/src/scheduler_state/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub struct AwaitedAction {
/// The time the action was last updated.
last_worker_updated_timestamp: AtomicU64,

/// Number of clients listening to the state of the action.
listening_clients: AtomicUsize,

/// Worker that is currently running this action, None if unassigned.
worker_id: RwLock<Option<WorkerId>>,

Expand Down Expand Up @@ -118,6 +121,7 @@ impl AwaitedAction {
sort_info,
attempts: AtomicUsize::new(0),
last_worker_updated_timestamp: AtomicU64::new(SystemTime::now().unix_timestamp()),
listening_clients: AtomicUsize::new(0),
worker_id: RwLock::new(None),
},
sort_key,
Expand All @@ -139,6 +143,18 @@ impl AwaitedAction {
SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp)
}

pub fn get_listening_clients(&self) -> usize {
self.listening_clients.load(Ordering::Acquire)
}

pub fn inc_listening_clients(&self) {
self.listening_clients.fetch_add(1, Ordering::Release);
}

pub fn dec_listening_clients(&self) {
self.listening_clients.fetch_sub(1, Ordering::Release);
}

/// Updates the timestamp of the action.
fn update_worker_timestamp(&self) {
self.last_worker_updated_timestamp
Expand Down Expand Up @@ -192,11 +208,6 @@ impl AwaitedAction {
self.attempts.fetch_add(1, Ordering::Release);
}

// /// Subtracts one from the number of attempts the action has been tried.
// pub fn dec_attempts(&self) {
// self.attempts.fetch_sub(1, Ordering::Release);
// }

/// Gets the worker id that is currently processing this action.
pub fn get_worker_id(&self) -> Option<WorkerId> {
*self.worker_id.read()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,38 @@ use std::sync::Arc;

use async_trait::async_trait;
use nativelink_error::Error;
use nativelink_util::action_messages::{ActionInfo, ActionState};
use nativelink_util::{
action_messages::{ActionInfo, ActionState},
task::JoinHandleDropGuard,
};
use tokio::sync::watch::Receiver;

use crate::operation_state_manager::ActionStateResult;

pub(crate) struct ClientActionStateResult {
/// The receiver for the action state updates.
rx: Receiver<Arc<ActionState>>,

/// Holds a handle to an optional spawn that will be automatically
/// canceled when this struct is dropped.
/// This is primarily used to keep the EvictionMap from dropping the
/// struct while a client is listening for updates.
_maybe_keepalive_spawn: Option<JoinHandleDropGuard<()>>,
}

impl ClientActionStateResult {
pub(crate) fn new(mut rx: Receiver<Arc<ActionState>>) -> Self {
pub(crate) fn new(
mut rx: Receiver<Arc<ActionState>>,
maybe_keepalive_spawn: Option<JoinHandleDropGuard<()>>,
) -> Self {
// Marking the initial value as changed for new or existing actions regardless if
// underlying state has changed. This allows for triggering notification after subscription
// without having to use an explicit notification.
rx.mark_changed();
Self { rx }
Self {
rx,
_maybe_keepalive_spawn: maybe_keepalive_spawn,
}
}
}

Expand Down
Loading

0 comments on commit b60e10f

Please sign in to comment.