diff --git a/nativelink-scheduler/src/operation_state_manager.rs b/nativelink-scheduler/src/operation_state_manager.rs index 7baa5f156..5435aca60 100644 --- a/nativelink-scheduler/src/operation_state_manager.rs +++ b/nativelink-scheduler/src/operation_state_manager.rs @@ -98,7 +98,7 @@ pub struct OrderBy { pub type ActionStateResultStream = Pin> + Send>>; #[async_trait] -pub trait ClientStateManager { +pub trait ClientStateManager: Sync + Send + 'static { /// Add a new action to the queue or joins an existing action. async fn add_action( &self, @@ -113,7 +113,7 @@ pub trait ClientStateManager { } #[async_trait] -pub trait WorkerStateManager { +pub trait WorkerStateManager: Sync + Send + 'static { /// Update that state of an operation. /// The worker must also send periodic updates even if the state /// did not change with a modified timestamp in order to prevent @@ -127,7 +127,7 @@ pub trait WorkerStateManager { } #[async_trait] -pub trait MatchingEngineStateManager { +pub trait MatchingEngineStateManager: Sync + Send + 'static { /// Returns a stream of operations that match the filter. async fn filter_operations( &self, diff --git a/nativelink-scheduler/src/scheduler_state/state_manager.rs b/nativelink-scheduler/src/scheduler_state/state_manager.rs index 087fc5e93..554af61ec 100644 --- a/nativelink-scheduler/src/scheduler_state/state_manager.rs +++ b/nativelink-scheduler/src/scheduler_state/state_manager.rs @@ -21,10 +21,9 @@ use async_lock::Mutex; use async_trait::async_trait; use futures::stream; use hashbrown::{HashMap, HashSet}; -use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; +use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata, - OperationId, WorkerId, + ActionInfo, ActionInfoHashKey, ActionStage, ActionState, OperationId, WorkerId, }; use tokio::sync::watch::error::SendError; use tokio::sync::{watch, Notify}; @@ -39,8 +38,6 @@ use crate::scheduler_state::client_action_state_result::ClientActionStateResult; use crate::scheduler_state::completed_action::CompletedAction; use crate::scheduler_state::matching_engine_action_state_result::MatchingEngineActionStateResult; use crate::scheduler_state::metrics::Metrics; -use crate::scheduler_state::workers::Workers; -use crate::worker::WorkerUpdate; #[repr(transparent)] pub(crate) struct StateManager { @@ -52,22 +49,18 @@ impl StateManager { pub(crate) fn new( queued_actions_set: HashSet>, queued_actions: BTreeMap, AwaitedAction>, - workers: Workers, active_actions: HashMap, AwaitedAction>, recently_completed_actions: HashSet, metrics: Arc, - max_job_retries: usize, tasks_change_notify: Arc, ) -> Self { Self { inner: Mutex::new(StateManagerImpl { queued_actions_set, queued_actions, - workers, active_actions, recently_completed_actions, metrics, - max_job_retries, tasks_change_notify, }), } @@ -102,10 +95,6 @@ pub(crate) struct StateManagerImpl { /// Important: `queued_actions_set` and `queued_actions` must be kept in sync. pub(crate) queued_actions: BTreeMap, AwaitedAction>, - /// A `Workers` pool that contains all workers that are available to execute actions in a priority - /// order based on the allocation strategy. - pub(crate) workers: Workers, - /// A map of all actions that are active. A hashmap is used to find actions that are active in /// O(1) time. The key is the `ActionInfo` struct. The value is the `AwaitedAction` struct. pub(crate) active_actions: HashMap, AwaitedAction>, @@ -118,9 +107,6 @@ pub(crate) struct StateManagerImpl { pub(crate) metrics: Arc, - /// Default times a job can retry before failing. - pub(crate) max_job_retries: usize, - /// Notify task<->worker matching engine that work needs to be done. pub(crate) tasks_change_notify: Arc, } @@ -199,123 +185,6 @@ fn mutate_priority(action_info: &mut Arc, priority: i32) { } impl StateManagerImpl { - fn immediate_evict_worker(&mut self, worker_id: &WorkerId, err: Error) { - if let Some(mut worker) = self.workers.remove_worker(worker_id) { - self.metrics.workers_evicted.inc(); - // We don't care if we fail to send message to worker, this is only a best attempt. - let _ = worker.notify_update(WorkerUpdate::Disconnect); - // We create a temporary Vec to avoid doubt about a possible code - // path touching the worker.running_action_infos elsewhere. - for action_info in worker.running_action_infos.drain() { - self.metrics.workers_evicted_with_running_action.inc(); - self.retry_action(&action_info, worker_id, err.clone()); - } - // Note: Calling this multiple times is very cheap, it'll only trigger `do_try_match` once. - self.tasks_change_notify.notify_one(); - } - } - - fn retry_action(&mut self, action_info: &Arc, worker_id: &WorkerId, err: Error) { - match self.active_actions.remove(action_info) { - Some(running_action) => { - let mut awaited_action = running_action; - let send_result = if awaited_action.attempts >= self.max_job_retries { - self.metrics.retry_action_max_attempts_reached.inc(); - Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Completed(ActionResult { - execution_metadata: ExecutionMetadata { - worker: format!("{worker_id}"), - ..ExecutionMetadata::default() - }, - error: Some(err.merge(make_err!( - Code::Internal, - "Job cancelled because it attempted to execute too many times and failed" - ))), - ..ActionResult::default() - }); - awaited_action - .notify_channel - .send(awaited_action.current_state.clone()) - // Do not put the action back in the queue here, as this action attempted to run too many - // times. - } else { - self.metrics.retry_action.inc(); - Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Queued; - let send_result = awaited_action - .notify_channel - .send(awaited_action.current_state.clone()); - self.queued_actions_set.insert(action_info.clone()); - self.queued_actions - .insert(action_info.clone(), awaited_action); - send_result - }; - - if send_result.is_err() { - self.metrics.retry_action_no_more_listeners.inc(); - // Don't remove this task, instead we keep them around for a bit just in case - // the client disconnected and will reconnect and ask for same job to be executed - // again. - event!( - Level::WARN, - ?action_info, - ?worker_id, - "Action has no more listeners during evict_worker()" - ); - } - } - None => { - self.metrics.retry_action_but_action_missing.inc(); - event!( - Level::ERROR, - ?action_info, - ?worker_id, - "Worker stated it was running an action, but it was not in the active_actions" - ); - } - } - } - - /// Notifies the specified worker to run the given action and handles errors by evicting - /// the worker if the notification fails. - /// - /// # Note - /// - /// Intended utility function for matching engine. - /// - /// # Errors - /// - /// This function will return an error if the notification to the worker fails, and in that case, - /// the worker will be immediately evicted from the system. - /// - async fn worker_notify_run_action( - &mut self, - worker_id: WorkerId, - action_info: Arc, - ) -> Result<(), Error> { - if let Some(worker) = self.workers.workers.get_mut(&worker_id) { - let notify_worker_result = - worker.notify_update(WorkerUpdate::RunAction(action_info.clone())); - - if notify_worker_result.is_err() { - event!( - Level::WARN, - ?worker_id, - ?action_info, - ?notify_worker_result, - "Worker command failed, removing worker", - ); - - let err = make_err!( - Code::Internal, - "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}", - ); - - self.immediate_evict_worker(&worker_id, err.clone()); - return Err(err); - } - } - Ok(()) - } - /// Marks the specified action as active, assigns it to the given worker, and updates the /// action stage. This function removes the action from the queue, updates the action's state /// or error, and inserts it into the set of active actions. @@ -404,7 +273,7 @@ impl StateManagerImpl { ?action_info_hash_key, ?worker_id, "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker", - ); + ); return; }; if running_action_worker_id == *worker_id { @@ -428,20 +297,6 @@ impl StateManagerImpl { self.active_actions .insert(action_info.clone(), running_action); - // Clear this action from the current worker. - if let Some(worker) = self.workers.workers.get_mut(worker_id) { - let was_paused = !worker.can_accept_work(); - // This unpauses, but since we're completing with an error, don't - // unpause unless all actions have completed. - worker.complete_action(&action_info); - // Only pause if there's an action still waiting that will unpause. - if (was_paused || due_to_backpressure) && worker.has_actions() { - worker.is_paused = true; - } - } - - // Re-queue the action or fail on max attempts. - self.retry_action(&action_info, worker_id, err); self.tasks_change_notify.notify_one(); } } @@ -567,12 +422,10 @@ impl WorkerStateManager for StateManager { ?action_stage, "Worker sent error while updating action. Removing worker" ); - let err = make_err!( - Code::Internal, - "Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.", - ); - inner.immediate_evict_worker(&worker_id, err.clone()); - return Err(err); + return Err(make_err!( + Code::Internal, + "Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.", + )); } let (action_info, mut running_action) = inner @@ -585,15 +438,14 @@ impl WorkerStateManager for StateManager { if running_action.worker_id != Some(worker_id) { inner.metrics.update_action_from_wrong_worker.inc(); let err = match running_action.worker_id { - Some(running_action_worker_id) => make_err!( - Code::Internal, - "Got a result from a worker that should not be running the action, Removing worker. Expected worker {running_action_worker_id} got worker {worker_id}", - ), + Code::Internal, + "Got a result from a worker that should not be running the action, Removing worker. Expected worker {running_action_worker_id} got worker {worker_id}", + ), None => make_err!( - Code::Internal, - "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}", - ), + Code::Internal, + "Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}", + ), }; event!( Level::ERROR, @@ -605,7 +457,6 @@ impl WorkerStateManager for StateManager { ); // First put it back in our active_actions or we will drop the task. inner.active_actions.insert(action_info, running_action); - inner.immediate_evict_worker(&worker_id, err.clone()); return Err(err); } @@ -635,10 +486,6 @@ impl WorkerStateManager for StateManager { state: running_action.current_state, }); - let worker = inner.workers.workers.get_mut(&worker_id).ok_or_else(|| { - make_input_err!("WorkerId '{}' does not exist in workers map", worker_id) - })?; - worker.complete_action(&action_info); inner.tasks_change_notify.notify_one(); Ok(()) } @@ -689,9 +536,6 @@ impl MatchingEngineStateManager for StateManager { if let Some(action_info) = inner.queued_actions_set.get(&operation_id.unique_qualifier) { if let Some(worker_id) = worker_id { let action_info = action_info.clone(); - inner - .worker_notify_run_action(worker_id, action_info.clone()) - .await?; inner .worker_set_as_active(action_info, worker_id, action_stage) .await?; diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index 33f9e0122..dd4f261e1 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -65,13 +65,16 @@ const DEFAULT_MAX_JOB_RETRIES: usize = 3; struct SimpleSchedulerImpl { /// The manager responsible for holding the state of actions and workers. - state_manager: StateManager, + state_manager: Arc, /// The duration that actions are kept in recently_completed_actions for. retain_completed_for: Duration, /// Timeout of how long to evict workers if no response in this given amount of time in seconds. worker_timeout_s: u64, /// Default times a job can retry before failing. max_job_retries: usize, + /// A `Workers` pool that contains all workers that are available to execute actions in a priority + /// order based on the allocation strategy. + workers: Workers, metrics: Arc, } @@ -216,12 +219,13 @@ impl SimpleSchedulerImpl { /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it. fn immediate_evict_worker( inner_state: &mut MutexGuard<'_, StateManagerImpl>, + workers: &mut Workers, max_job_retries: usize, metrics: &Metrics, worker_id: &WorkerId, err: Error, ) { - if let Some(mut worker) = inner_state.workers.remove_worker(worker_id) { + if let Some(mut worker) = workers.remove_worker(worker_id) { metrics.workers_evicted.inc(); // We don't care if we fail to send message to worker, this is only a best attempt. let _ = worker.notify_update(WorkerUpdate::Disconnect); @@ -249,15 +253,75 @@ impl SimpleSchedulerImpl { worker_id: WorkerId, is_draining: bool, ) -> Result<(), Error> { - let mut inner_state = self.state_manager.inner.lock().await; - let worker = inner_state + let worker = self .workers .workers .get_mut(&worker_id) .err_tip(|| format!("Worker {worker_id} doesn't exist in the pool"))?; self.metrics.workers_drained.inc(); worker.is_draining = is_draining; - inner_state.tasks_change_notify.notify_one(); + self.state_manager + .inner + .lock() + .await + .tasks_change_notify + .notify_one(); + Ok(()) + } + + /// Notifies the specified worker to run the given action and handles errors by evicting + /// the worker if the notification fails. + /// + /// # Note + /// + /// Intended utility function for matching engine. + /// + /// # Errors + /// + /// This function will return an error if the notification to the worker fails, and in that case, + /// the worker will be immediately evicted from the system. + /// + async fn worker_notify_run_action( + &mut self, + worker_id: WorkerId, + action_info: Arc, + ) -> Result<(), Error> { + if let Some(worker) = self.workers.workers.get_mut(&worker_id) { + let notify_worker_result = + worker.notify_update(WorkerUpdate::RunAction(action_info.clone())); + + if notify_worker_result.is_err() { + event!( + Level::WARN, + ?worker_id, + ?action_info, + ?notify_worker_result, + "Worker command failed, removing worker", + ); + + let err = make_err!( + Code::Internal, + "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}", + ); + + let max_job_retries = self.max_job_retries; + let metrics = self.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = self.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + SimpleSchedulerImpl::immediate_evict_worker( + &mut inner_state, + &mut self.workers, + max_job_retries, + &metrics, + &worker_id, + err.clone(), + ); + return Err(err); + } + } Ok(()) } @@ -320,13 +384,9 @@ impl SimpleSchedulerImpl { continue; }; - let maybe_worker_id: Option = { - let inner_state = self.state_manager.inner.lock().await; - - inner_state - .workers - .find_worker_for_action(&action_info.platform_properties) - }; + let maybe_worker_id = self + .workers + .find_worker_for_action(&action_info.platform_properties); let operation_id = state.id.clone(); let ret = ::update_operation( @@ -338,12 +398,43 @@ impl SimpleSchedulerImpl { .await; if let Err(e) = ret { + if let Some(worker_id) = maybe_worker_id { + let max_job_retries = self.max_job_retries; + let metrics = self.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = self.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + SimpleSchedulerImpl::immediate_evict_worker( + &mut inner_state, + &mut self.workers, + max_job_retries, + &metrics, + &worker_id, + e.clone(), + ); + } + event!( Level::ERROR, ?e, "update operation failed for {}", operation_id ); + } else if let Some(worker_id) = maybe_worker_id { + if let Err(err) = self + .worker_notify_run_action(worker_id, action_info.clone()) + .await + { + event!( + Level::ERROR, + ?err, + ?worker_id, + ?action_info, + "failed to run worker_notify_run_action in SimpleSchedulerImpl::do_try_match" + ); + } } } } @@ -359,14 +450,63 @@ impl SimpleSchedulerImpl { action_info_hash_key: ActionInfoHashKey, action_stage: Result, ) -> Result<(), Error> { + let worker = self.workers.workers.get_mut(worker_id).err_tip(|| { + format!("Worker {worker_id} does not exist in SimpleSchedulerImpl::update_action") + })?; + let action_info_res = worker + .running_action_infos + .get(&action_info_hash_key) + .err_tip(|| format!("Action {action_info_hash_key:?} should not be running on worker {worker_id} in SimpleSchedulerImpl::update_action")); + let action_info = match action_info_res { + Ok(action_info) => action_info.clone(), + Err(err) => { + let max_job_retries = self.max_job_retries; + let metrics = self.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = self.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + SimpleSchedulerImpl::immediate_evict_worker( + &mut inner_state, + &mut self.workers, + max_job_retries, + &metrics, + worker_id, + err.clone(), + ); + return Err(err); + } + }; + let operation_id = OperationId::new(action_info_hash_key.clone()); + let due_to_backpressure = action_stage + .as_ref() + .map_or_else(|e| e.code == Code::ResourceExhausted, |_| false); let update_operation_result = ::update_operation( &self.state_manager, - OperationId::new(action_info_hash_key.clone()), + operation_id.clone(), *worker_id, - action_stage, + action_stage.clone(), ) - .await; - if let Err(e) = &update_operation_result { + .await + .err_tip(|| "in update_operation on SimpleSchedulerImpl::update_action"); + if let Err(e) = update_operation_result { + let max_job_retries = self.max_job_retries; + let metrics = self.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = self.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + SimpleSchedulerImpl::immediate_evict_worker( + &mut inner_state, + &mut self.workers, + max_job_retries, + &metrics, + worker_id, + e.clone(), + ); + event!( Level::ERROR, ?action_info_hash_key, @@ -374,8 +514,43 @@ impl SimpleSchedulerImpl { ?e, "Failed to update_operation on update_action" ); + return Err(e); + } + + match action_stage { + Ok(_) => worker.complete_action(&action_info), + Err(err) => { + // Clear this action from the current worker. + let was_paused = !worker.can_accept_work(); + // This unpauses, but since we're completing with an error, don't + // unpause unless all actions have completed. + // Note: We need to run this before dealing with backpressure logic. + worker.complete_action(&action_info); + // Only pause if there's an action still waiting that will unpause. + if (was_paused || due_to_backpressure) && worker.has_actions() { + worker.is_paused = true; + } + + let max_job_retries = self.max_job_retries; + let metrics = self.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = self.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + // Re-queue the action or fail on max attempts. + SimpleSchedulerImpl::retry_action( + &mut inner_state, + max_job_retries, + &metrics, + &action_info, + worker_id, + err, + ); + } } - update_operation_result + + Ok(()) } } @@ -436,16 +611,14 @@ impl SimpleScheduler { } let tasks_change_notify = Arc::new(Notify::new()); - let state_manager = StateManager::new( + let state_manager = Arc::new(StateManager::new( HashSet::new(), BTreeMap::new(), - Workers::new(scheduler_cfg.allocation_strategy), HashMap::new(), HashSet::new(), Arc::new(SchedulerMetrics::default()), - max_job_retries, tasks_change_notify.clone(), - ); + )); let metrics = Arc::new(Metrics::default()); let metrics_for_do_try_match = metrics.clone(); let inner = Arc::new(Mutex::new(SimpleSchedulerImpl { @@ -454,6 +627,7 @@ impl SimpleScheduler { worker_timeout_s, max_job_retries, metrics: metrics.clone(), + workers: Workers::new(scheduler_cfg.allocation_strategy), })); let weak_inner = Arc::downgrade(&inner); Self { @@ -493,8 +667,7 @@ impl SimpleScheduler { #[must_use] pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool { let inner_scheduler = self.get_inner_lock().await; - let inner_state = inner_scheduler.state_manager.inner.lock().await; - inner_state.workers.workers.contains(worker_id) + inner_scheduler.workers.workers.contains(worker_id) } /// A unit test function used to send the keep alive message to the worker from the server. @@ -502,9 +675,8 @@ impl SimpleScheduler { &self, worker_id: &WorkerId, ) -> Result<(), Error> { - let inner_scheduler = self.get_inner_lock().await; - let mut inner_state = inner_scheduler.state_manager.inner.lock().await; - let worker = inner_state + let mut inner_scheduler = self.get_inner_lock().await; + let worker = inner_scheduler .workers .workers .get_mut(worker_id) @@ -594,17 +766,22 @@ impl WorkerScheduler for SimpleScheduler { async fn add_worker(&self, worker: Worker) -> Result<(), Error> { let worker_id = worker.id; - let inner_scheduler = self.get_inner_lock().await; + let mut inner_scheduler = self.get_inner_lock().await; let max_job_retries = inner_scheduler.max_job_retries; - let mut inner_state = inner_scheduler.state_manager.inner.lock().await; + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = inner_scheduler.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; self.metrics.add_worker.wrap(move || { - let res = inner_state + let res = inner_scheduler .workers .add_worker(worker) .err_tip(|| "Error while adding worker, removing from pool"); if let Err(err) = &res { SimpleSchedulerImpl::immediate_evict_worker( &mut inner_state, + &mut inner_scheduler.workers, max_job_retries, &self.metrics, &worker_id, @@ -634,68 +811,82 @@ impl WorkerScheduler for SimpleScheduler { worker_id: &WorkerId, timestamp: WorkerTimestamp, ) -> Result<(), Error> { - let inner_scheduler = self.get_inner_lock().await; - let mut inner_state = inner_scheduler.state_manager.inner.lock().await; - inner_state + let mut inner_scheduler = self.get_inner_lock().await; + inner_scheduler .workers .refresh_lifetime(worker_id, timestamp) .err_tip(|| "Error refreshing lifetime in worker_keep_alive_received()") } async fn remove_worker(&self, worker_id: WorkerId) { - let inner_scheduler = self.get_inner_lock().await; - let mut inner_state = inner_scheduler.state_manager.inner.lock().await; + let mut inner_scheduler = self.get_inner_lock().await; + let max_job_retries = inner_scheduler.max_job_retries; + let metrics = inner_scheduler.metrics.clone(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = inner_scheduler.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; SimpleSchedulerImpl::immediate_evict_worker( &mut inner_state, - inner_scheduler.max_job_retries, - &inner_scheduler.metrics, + &mut inner_scheduler.workers, + max_job_retries, + &metrics, &worker_id, make_err!(Code::Internal, "Received request to remove worker"), ); } async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> { - let inner_scheduler = self.get_inner_lock().await; + let mut inner_scheduler = self.get_inner_lock().await; let worker_timeout_s = inner_scheduler.worker_timeout_s; let max_job_retries = inner_scheduler.max_job_retries; let metrics = inner_scheduler.metrics.clone(); - let mut inner_state = inner_scheduler.state_manager.inner.lock().await; - self.metrics.remove_timedout_workers.wrap(move || { - // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire - // map most of the time. - let worker_ids_to_remove: Vec = inner_state - .workers - .workers - .iter() - .rev() - .map_while(|(worker_id, worker)| { - if worker.last_update_timestamp <= now_timestamp - worker_timeout_s { - Some(*worker_id) - } else { - None - } - }) - .collect(); - for worker_id in &worker_ids_to_remove { - event!( - Level::WARN, - ?worker_id, - "Worker timed out, removing from pool" - ); - SimpleSchedulerImpl::immediate_evict_worker( - &mut inner_state, - max_job_retries, - &metrics, - worker_id, - make_err!( - Code::Internal, - "Worker {worker_id} timed out, removing from pool" - ), - ); - } + self.metrics + .remove_timedout_workers + .wrap(async move { + // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire + // map most of the time. + let worker_ids_to_remove: Vec = inner_scheduler + .workers + .workers + .iter() + .rev() + .map_while(|(worker_id, worker)| { + if worker.last_update_timestamp <= now_timestamp - worker_timeout_s { + Some(*worker_id) + } else { + None + } + }) + .collect(); + // TODO(allada) This is to get around rust borrow checker with double mut borrows + // of a mutex lock. Once the scheduler is fully moved to state manager this can be + // removed. + let state_manager = inner_scheduler.state_manager.clone(); + let mut inner_state = state_manager.inner.lock().await; + for worker_id in &worker_ids_to_remove { + event!( + Level::WARN, + ?worker_id, + "Worker timed out, removing from pool" + ); + SimpleSchedulerImpl::immediate_evict_worker( + &mut inner_state, + &mut inner_scheduler.workers, + max_job_retries, + &metrics, + worker_id, + make_err!( + Code::Internal, + "Worker {worker_id} timed out, removing from pool" + ), + ); + } - Ok(()) - }) + Ok(()) + }) + .await } async fn set_drain_worker(&self, worker_id: WorkerId, is_draining: bool) -> Result<(), Error> { @@ -724,7 +915,7 @@ impl MetricsComponent for SimpleScheduler { ); c.publish( "workers_total", - &inner_state.workers.workers.len(), + &inner_scheduler.workers.workers.len(), "The number workers active.", ); c.publish( @@ -753,7 +944,7 @@ impl MetricsComponent for SimpleScheduler { "The amount of times a job is allowed to retry from an internal error before it is dropped.", ); let mut props = HashMap::<&String, u64>::new(); - for (_worker_id, worker) in inner_state.workers.workers.iter() { + for (_worker_id, worker) in inner_scheduler.workers.workers.iter() { c.publish_with_labels( "workers", worker, @@ -806,7 +997,7 @@ struct Metrics { existing_actions_found: CounterWithTime, existing_actions_not_found: CounterWithTime, clean_recently_completed_actions: CounterWithTime, - remove_timedout_workers: FuncCounterWrapper, + remove_timedout_workers: AsyncCounterWrapper, update_action: AsyncCounterWrapper, update_action_with_internal_error: CounterWithTime, update_action_with_internal_error_no_action: CounterWithTime, diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index a7993831b..3e5afe8ab 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -1003,6 +1003,7 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { // Other tests check full data. We only care if client thinks we are Executing. assert_eq!(client_rx.borrow_and_update().stage, ActionStage::Executing); } + let _ = setup_new_worker(&scheduler, rogue_worker_id, PlatformProperties::default()).await?; let action_info_hash_key = ActionInfoHashKey { instance_name: INSTANCE_NAME.to_string(), @@ -1043,8 +1044,7 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { .await; { - const EXPECTED_ERR: &str = - "Got a result from a worker that should not be running the action"; + const EXPECTED_ERR: &str = "should not be running on worker"; // Our request should have sent an error back. assert!( update_action_result.is_err(), @@ -1503,10 +1503,13 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, }, server_logs: HashMap::default(), - error: Some(err.merge(make_err!( - Code::Internal, - "Job cancelled because it attempted to execute too many times and failed" - ))), + error: Some( + err.append("in update_operation on SimpleSchedulerImpl::update_action") + .merge(make_err!( + Code::Internal, + "Job cancelled because it attempted to execute too many times and failed" + )), + ), message: String::new(), }), };