Skip to content

Commit

Permalink
Add versioning to AwaitedAction (TraceMachina#1163)
Browse files Browse the repository at this point in the history
  • Loading branch information
allada authored Jul 16, 2024
1 parent 73c19c4 commit 080df5d
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 88 deletions.
18 changes: 18 additions & 0 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,18 @@ use nativelink_util::action_messages::{
use nativelink_util::evicting_map::InstantWrapper;
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};

/// The version of the awaited action.
/// This number will always increment by one each time
/// the action is updated.
#[derive(Debug, Clone, Copy)]
struct AwaitedActionVersion(u64);

/// An action that is being awaited on and last known state.
#[derive(Debug, Clone)]
pub struct AwaitedAction {
/// The current version of the action.
version: AwaitedActionVersion,

/// The action that is being awaited on.
action_info: Arc<ActionInfo>,

Expand Down Expand Up @@ -65,6 +74,7 @@ impl AwaitedAction {
id: operation_id.clone(),
});
Self {
version: AwaitedActionVersion(0),
action_info,
operation_id,
sort_key,
Expand All @@ -75,6 +85,14 @@ impl AwaitedAction {
}
}

pub fn version(&self) -> u64 {
self.version.0
}

pub fn increment_version(&mut self) {
self.version = AwaitedActionVersion(self.version.0 + 1);
}

pub fn action_info(&self) -> &Arc<ActionInfo> {
&self.action_info
}
Expand Down
26 changes: 23 additions & 3 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ impl AwaitedActionDbImpl {
.map(|tx| MemoryAwaitedActionSubscriber::new(tx.subscribe()))
}

// TODO!(rename)
fn get_range_of_actions<'a, 'b>(
&'a self,
state: SortedAwaitedActionState,
Expand Down Expand Up @@ -595,6 +594,26 @@ impl AwaitedActionDbImpl {
// Note: It's important to drop old_awaited_action before we call
// send_replace or we will have a deadlock.
let old_awaited_action = tx.borrow();

// Do not process changes if the action version is not in sync with
// what the sender based the update on.
if old_awaited_action.version() + 1 != new_awaited_action.version() {
return Err(make_err!(
// From: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
// Use ABORTED if the client should retry at a higher level
// (e.g., when a client-specified test-and-set fails,
// indicating the client should restart a read-modify-write
// sequence)
Code::Aborted,
"{} Expected {:?} but got {:?} for operation_id {:?} - {:?}",
"Tried to update an awaited action with an incorrect version.",
old_awaited_action.version() + 1,
new_awaited_action.version(),
old_awaited_action,
new_awaited_action,
));
}

error_if!(
old_awaited_action.action_info().unique_qualifier
!= new_awaited_action.action_info().unique_qualifier,
Expand All @@ -608,7 +627,6 @@ impl AwaitedActionDbImpl {
.stage
.is_same_stage(&new_awaited_action.state().stage);

// TODO!(Handle priority changes here).
if !is_same_stage {
self.sorted_action_info_hash_keys
.process_state_changes(&old_awaited_action, &new_awaited_action)?;
Expand Down Expand Up @@ -725,7 +743,9 @@ impl AwaitedActionDbImpl {
&mut self,
client_operation_id: &ClientOperationId,
unique_qualifier: &ActionUniqueQualifier,
// TODO!()
// TODO(allada) To simplify the scheduler 2024 refactor, we
// removed the ability to upgrade priorities of actions.
// we should add priority upgrades back in.
_priority: i32,
) -> Result<Option<MemoryAwaitedActionSubscriber>, Error> {
let unique_key = match unique_qualifier {
Expand Down
199 changes: 114 additions & 85 deletions nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ use super::awaited_action_db::{
};
use crate::memory_awaited_action_db::{ClientActionStateResult, MatchingEngineActionStateResult};

/// Maximum number of times an update to the database
/// can fail before giving up.
const MAX_UPDATE_RETRIES: usize = 5;

/// Simple struct that implements the ActionStateResult trait and always returns an error.
struct ErrorActionStateResult(Error);

Expand Down Expand Up @@ -149,101 +153,126 @@ impl<T: AwaitedActionDb> SimpleSchedulerStateManager<T> {
maybe_worker_id: Option<&WorkerId>,
action_stage_result: Result<ActionStage, Error>,
) -> Result<(), Error> {
let maybe_awaited_action_subscriber = self
.action_db
.get_by_operation_id(operation_id)
.await
.err_tip(|| "In MemorySchedulerStateManager::update_operation")?;
let awaited_action_subscriber = match maybe_awaited_action_subscriber {
Some(sub) => sub,
// No action found. It is ok if the action was not found. It probably
// means that the action was dropped, but worker was still processing
// it.
None => return Ok(()),
};
let mut last_err = None;
for _ in 0..MAX_UPDATE_RETRIES {
let maybe_awaited_action_subscriber = self
.action_db
.get_by_operation_id(operation_id)
.await
.err_tip(|| "In MemorySchedulerStateManager::update_operation")?;
let awaited_action_subscriber = match maybe_awaited_action_subscriber {
Some(sub) => sub,
// No action found. It is ok if the action was not found. It probably
// means that the action was dropped, but worker was still processing
// it.
None => return Ok(()),
};

let mut awaited_action = awaited_action_subscriber.borrow();
let mut awaited_action = awaited_action_subscriber.borrow();

// Make sure we don't update an action that is already completed.
if awaited_action.state().stage.is_finished() {
return Err(make_err!(
Code::Internal,
"Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}",
awaited_action.state().stage,
maybe_worker_id,
));
}
// Make sure we don't update an action that is already completed.
if awaited_action.state().stage.is_finished() {
return Err(make_err!(
Code::Internal,
"Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}",
awaited_action.state().stage,
maybe_worker_id,
));
}

// Make sure the worker id matches the awaited action worker id.
// This might happen if the worker sending the update is not the
// worker that was assigned.
if awaited_action.worker_id().is_some()
&& maybe_worker_id.is_some()
&& maybe_worker_id != awaited_action.worker_id().as_ref()
{
let err = make_err!(
Code::Internal,
"Worker ids do not match - {:?} != {:?} for {:?}",
maybe_worker_id,
awaited_action.worker_id(),
awaited_action,
);
event!(
Level::ERROR,
?operation_id,
?maybe_worker_id,
?awaited_action,
"{}",
err.to_string(),
);
return Err(err);
}
// Make sure the worker id matches the awaited action worker id.
// This might happen if the worker sending the update is not the
// worker that was assigned.
if awaited_action.worker_id().is_some()
&& maybe_worker_id.is_some()
&& maybe_worker_id != awaited_action.worker_id().as_ref()
{
let err = make_err!(
Code::Internal,
"Worker ids do not match - {:?} != {:?} for {:?}",
maybe_worker_id,
awaited_action.worker_id(),
awaited_action,
);
event!(
Level::ERROR,
?operation_id,
?maybe_worker_id,
?awaited_action,
"{}",
err.to_string(),
);
return Err(err);
}

let stage = match action_stage_result {
Ok(stage) => stage,
Err(err) => {
// Don't count a backpressure failure as an attempt for an action.
let due_to_backpressure = err.code == Code::ResourceExhausted;
if !due_to_backpressure {
awaited_action.attempts += 1;
let stage = match &action_stage_result {
Ok(stage) => stage.clone(),
Err(err) => {
// Don't count a backpressure failure as an attempt for an action.
let due_to_backpressure = err.code == Code::ResourceExhausted;
if !due_to_backpressure {
awaited_action.attempts += 1;
}

if awaited_action.attempts > self.max_job_retries {
ActionStage::Completed(ActionResult {
execution_metadata: ExecutionMetadata {
worker: maybe_worker_id.map_or_else(String::default, |v| v.to_string()),
..ExecutionMetadata::default()
},
error: Some(err.clone().merge(make_err!(
Code::Internal,
"Job cancelled because it attempted to execute too many times and failed {}",
format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"),
))),
..ActionResult::default()
})
} else {
ActionStage::Queued
}
}
};
if matches!(stage, ActionStage::Queued) {
// If the action is queued, we need to unset the worker id regardless of
// which worker sent the update.
awaited_action.set_worker_id(None);
} else {
awaited_action.set_worker_id(maybe_worker_id.copied());
}
awaited_action.set_state(Arc::new(ActionState {
stage,
id: operation_id.clone(),
}));
awaited_action.increment_version();

if awaited_action.attempts > self.max_job_retries {
ActionStage::Completed(ActionResult {
execution_metadata: ExecutionMetadata {
worker: maybe_worker_id.map_or_else(String::default, |v| v.to_string()),
..ExecutionMetadata::default()
},
error: Some(err.clone().merge(make_err!(
Code::Internal,
"Job cancelled because it attempted to execute too many times and failed {}",
format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"),
))),
..ActionResult::default()
})
let update_action_result = self
.action_db
.update_awaited_action(awaited_action)
.await
.err_tip(|| "In MemorySchedulerStateManager::update_operation");
if let Err(err) = update_action_result {
// We use Aborted to signal that the action was not
// updated due to the data being set was not the latest
// but can be retried.
if err.code == Code::Aborted {
last_err = Some(err);
continue;
} else {
ActionStage::Queued
return Err(err);
}
}
};
if matches!(stage, ActionStage::Queued) {
// If the action is queued, we need to unset the worker id regardless of
// which worker sent the update.
awaited_action.set_worker_id(None);
} else {
awaited_action.set_worker_id(maybe_worker_id.copied());
}
awaited_action.set_state(Arc::new(ActionState {
stage,
id: operation_id.clone(),
}));
self.action_db
.update_awaited_action(awaited_action)
.await
.err_tip(|| "In MemorySchedulerStateManager::update_operation")?;

self.tasks_change_notify.notify_one();
Ok(())
self.tasks_change_notify.notify_one();
return Ok(());
}
match last_err {
Some(err) => Err(err),
None => Err(make_err!(
Code::Internal,
"Failed to update action after {} retries with no error set",
MAX_UPDATE_RETRIES,
)),
}
}

async fn inner_add_operation(
Expand Down

0 comments on commit 080df5d

Please sign in to comment.