diff --git a/nativelink-config/src/schedulers.rs b/nativelink-config/src/schedulers.rs index 76f475734..0af053bdf 100644 --- a/nativelink-config/src/schedulers.rs +++ b/nativelink-config/src/schedulers.rs @@ -98,7 +98,7 @@ pub struct SimpleScheduler { /// a WaitExecution is called after the action has completed. /// Default: 60 (seconds) #[serde(default, deserialize_with = "convert_duration_with_shellexpand")] - pub retain_completed_for_s: u64, + pub retain_completed_for_s: u32, /// Remove workers from pool once the worker has not responded in this /// amount of time in seconds. diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index 622037362..d62dc990c 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -47,16 +47,12 @@ const DEFAULT_WORKER_TIMEOUT_S: u64 = 5; /// Default timeout for recently completed actions in seconds. /// If this changes, remember to change the documentation in the config. -const DEFAULT_RETAIN_COMPLETED_FOR_S: u64 = 60; +const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60; /// Default times a job can retry before failing. /// If this changes, remember to change the documentation in the config. const DEFAULT_MAX_JOB_RETRIES: usize = 3; -/// Default time in seconds before a client is evicted. -// TODO!(make this a config and documented) -const CLIENT_EVICTION_SECONDS: u32 = 300; // 5 mins - struct SimpleSchedulerActionListener { client_operation_id: ClientOperationId, action_state_result: Arc, @@ -119,12 +115,10 @@ pub struct SimpleScheduler { client_state_manager: Arc, platform_property_manager: Arc, - // metrics: Arc, - // Triggers `drop()`` call if scheduler is dropped. - _task_worker_matching_future: JoinHandleDropGuard<()>, - /// The duration that actions are kept in recently_completed_actions for. - _retain_completed_for: Duration, + /// Background task that tries to match actions to workers. If this struct + /// is dropped the spawn will be cancelled as well. + _task_worker_matching_spawn: JoinHandleDropGuard<()>, /// A `Workers` pool that contains all workers that are available to execute actions in a priority /// order based on the allocation strategy. @@ -310,7 +304,7 @@ impl SimpleScheduler { let tasks_or_worker_change_notify = Arc::new(Notify::new()); let state_manager = Arc::new(MemorySchedulerStateManager::new( &EvictionPolicy { - max_seconds: CLIENT_EVICTION_SECONDS, + max_seconds: retain_completed_for_s, ..Default::default() }, tasks_or_worker_change_notify.clone(), @@ -329,7 +323,7 @@ impl SimpleScheduler { let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self { let weak_inner = weak_self.clone(); - let task_worker_matching_future = + let task_worker_matching_spawn = spawn!("simple_scheduler_task_worker_matching", async move { // Break out of the loop only when the inner is dropped. loop { @@ -351,10 +345,9 @@ impl SimpleScheduler { SimpleScheduler { matching_engine_state_manager: state_manager.clone(), client_state_manager: state_manager.clone(), - _retain_completed_for: Duration::new(retain_completed_for_s, 0), workers, platform_property_manager, - _task_worker_matching_future: task_worker_matching_future, + _task_worker_matching_spawn: task_worker_matching_spawn, } }); (action_scheduler, workers_copy)