Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SimpleScheduler now uses config for action pruning #1137

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub struct SimpleScheduler {
/// a WaitExecution is called after the action has completed.
/// Default: 60 (seconds)
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub retain_completed_for_s: u64,
pub retain_completed_for_s: u32,

/// Remove workers from pool once the worker has not responded in this
/// amount of time in seconds.
Expand Down
21 changes: 7 additions & 14 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,12 @@ const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;

/// Default timeout for recently completed actions in seconds.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_RETAIN_COMPLETED_FOR_S: u64 = 60;
const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60;

/// Default times a job can retry before failing.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_MAX_JOB_RETRIES: usize = 3;

/// Default time in seconds before a client is evicted.
// TODO!(make this a config and documented)
const CLIENT_EVICTION_SECONDS: u32 = 300; // 5 mins

struct SimpleSchedulerActionListener {
client_operation_id: ClientOperationId,
action_state_result: Arc<dyn ActionStateResult>,
Expand Down Expand Up @@ -119,12 +115,10 @@ pub struct SimpleScheduler {
client_state_manager: Arc<dyn ClientStateManager>,

platform_property_manager: Arc<PlatformPropertyManager>,
// metrics: Arc<Metrics>,
// Triggers `drop()`` call if scheduler is dropped.
_task_worker_matching_future: JoinHandleDropGuard<()>,

/// The duration that actions are kept in recently_completed_actions for.
_retain_completed_for: Duration,
/// Background task that tries to match actions to workers. If this struct
/// is dropped the spawn will be cancelled as well.
_task_worker_matching_spawn: JoinHandleDropGuard<()>,

/// A `Workers` pool that contains all workers that are available to execute actions in a priority
/// order based on the allocation strategy.
Expand Down Expand Up @@ -310,7 +304,7 @@ impl SimpleScheduler {
let tasks_or_worker_change_notify = Arc::new(Notify::new());
let state_manager = Arc::new(MemorySchedulerStateManager::new(
&EvictionPolicy {
max_seconds: CLIENT_EVICTION_SECONDS,
max_seconds: retain_completed_for_s,
..Default::default()
},
tasks_or_worker_change_notify.clone(),
Expand All @@ -329,7 +323,7 @@ impl SimpleScheduler {

let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
let weak_inner = weak_self.clone();
let task_worker_matching_future =
let task_worker_matching_spawn =
spawn!("simple_scheduler_task_worker_matching", async move {
// Break out of the loop only when the inner is dropped.
loop {
Expand All @@ -351,10 +345,9 @@ impl SimpleScheduler {
SimpleScheduler {
matching_engine_state_manager: state_manager.clone(),
client_state_manager: state_manager.clone(),
_retain_completed_for: Duration::new(retain_completed_for_s, 0),
workers,
platform_property_manager,
_task_worker_matching_future: task_worker_matching_future,
_task_worker_matching_spawn: task_worker_matching_spawn,
}
});
(action_scheduler, workers_copy)
Expand Down