Skip to content

Commit

Permalink
Rename workers to worker_scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jul 10, 2024
1 parent 1fdd505 commit e50ef3c
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub struct SimpleScheduler {

/// A `Workers` pool that contains all workers that are available to execute actions in a priority
/// order based on the allocation strategy.
workers: Arc<ApiWorkerScheduler>,
worker_scheduler: Arc<ApiWorkerScheduler>,
}

impl SimpleScheduler {
Expand Down Expand Up @@ -246,7 +246,7 @@ impl SimpleScheduler {
result = result.merge(
match_action_to_worker(
action_state_result.as_ref(),
self.workers.as_ref(),
self.worker_scheduler.as_ref(),
self.matching_engine_state_manager.as_ref(),
)
.await,
Expand Down Expand Up @@ -311,15 +311,15 @@ impl SimpleScheduler {
max_job_retries,
));

let workers = ApiWorkerScheduler::new(
let worker_scheduler = ApiWorkerScheduler::new(
state_manager.clone(),
platform_property_manager.clone(),
scheduler_cfg.allocation_strategy,
tasks_or_worker_change_notify.clone(),
worker_timeout_s,
);

let workers_copy = workers.clone();
let worker_scheduler_clone = worker_scheduler.clone();

let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
let weak_inner = weak_self.clone();
Expand All @@ -345,12 +345,12 @@ impl SimpleScheduler {
SimpleScheduler {
matching_engine_state_manager: state_manager.clone(),
client_state_manager: state_manager.clone(),
workers,
worker_scheduler,
platform_property_manager,
_task_worker_matching_spawn: task_worker_matching_spawn,
}
});
(action_scheduler, workers_copy)
(action_scheduler, worker_scheduler_clone)
}
}

Expand Down Expand Up @@ -391,11 +391,11 @@ impl ActionScheduler for SimpleScheduler {
#[async_trait]
impl WorkerScheduler for SimpleScheduler {
fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
self.workers.get_platform_property_manager()
self.worker_scheduler.get_platform_property_manager()
}

async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
self.workers.add_worker(worker).await
self.worker_scheduler.add_worker(worker).await
}

async fn update_action(
Expand All @@ -404,7 +404,7 @@ impl WorkerScheduler for SimpleScheduler {
operation_id: &OperationId,
action_stage: Result<ActionStage, Error>,
) -> Result<(), Error> {
self.workers
self.worker_scheduler
.update_action(worker_id, operation_id, action_stage)
.await
}
Expand All @@ -414,24 +414,24 @@ impl WorkerScheduler for SimpleScheduler {
worker_id: &WorkerId,
timestamp: WorkerTimestamp,
) -> Result<(), Error> {
self.workers
self.worker_scheduler
.worker_keep_alive_received(worker_id, timestamp)
.await
}

async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
self.workers.remove_worker(worker_id).await
self.worker_scheduler.remove_worker(worker_id).await
}

async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
self.workers.remove_timedout_workers(now_timestamp).await
self.worker_scheduler.remove_timedout_workers(now_timestamp).await
}

async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
self.workers.set_drain_worker(worker_id, is_draining).await
self.worker_scheduler.set_drain_worker(worker_id, is_draining).await
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
self.workers.clone().register_metrics(registry);
self.worker_scheduler.clone().register_metrics(registry);
}
}

0 comments on commit e50ef3c

Please sign in to comment.