Skip to content

Commit

Permalink
Remove all need for workers to know about ActionId (#1125)
Browse files Browse the repository at this point in the history
Workers should only talk OperationId.
  • Loading branch information
allada authored Jul 10, 2024
1 parent 5482d7f commit 3b86036
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ message ConnectionResult {
reserved 2; // NextId.
}

/// Request to kill a running action sent from the scheduler to a worker.
message KillActionRequest {
/// The the hex encoded unique qualifier for the action to be killed.
string action_id = 1;
/// Request to kill a running operation sent from the scheduler to a worker.
message KillOperationRequest {
/// The the operation id for the operation to be killed.
string operation_id = 1;
reserved 2; // NextId.
}
/// Communication from the scheduler to the worker.
Expand All @@ -152,8 +152,8 @@ message UpdateForWorker {
/// The worker may discard any outstanding work that is being executed.
google.protobuf.Empty disconnect = 4;

/// Instructs the worker to kill a specific running action.
KillActionRequest kill_action_request = 5;
/// Instructs the worker to kill a specific running operation.
KillOperationRequest kill_operation_request = 5;
}
reserved 6; // NextId.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ pub struct ConnectionResult {
#[prost(string, tag = "1")]
pub worker_id: ::prost::alloc::string::String,
}
/// / Request to kill a running action sent from the scheduler to a worker.
/// / Request to kill a running operation sent from the scheduler to a worker.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KillActionRequest {
/// / The the hex encoded unique qualifier for the action to be killed.
pub struct KillOperationRequest {
/// / The the operation id for the operation to be killed.
#[prost(string, tag = "1")]
pub action_id: ::prost::alloc::string::String,
pub operation_id: ::prost::alloc::string::String,
}
/// / Communication from the scheduler to the worker.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -133,9 +133,9 @@ pub mod update_for_worker {
/// / The worker may discard any outstanding work that is being executed.
#[prost(message, tag = "4")]
Disconnect(()),
/// / Instructs the worker to kill a specific running action.
/// / Instructs the worker to kill a specific running operation.
#[prost(message, tag = "5")]
KillActionRequest(super::KillActionRequest),
KillOperationRequest(super::KillOperationRequest),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
4 changes: 2 additions & 2 deletions nativelink-scheduler/tests/simple_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ fn update_eq(expected: UpdateForWorker, actual: UpdateForWorker, ignore_id: bool
}
_ => false,
},
update_for_worker::Update::KillActionRequest(actual_update) => match expected_update {
update_for_worker::Update::KillActionRequest(expected_update) => {
update_for_worker::Update::KillOperationRequest(actual_update) => match expected_update {
update_for_worker::Update::KillOperationRequest(expected_update) => {
expected_update == actual_update
}
_ => false,
Expand Down
33 changes: 21 additions & 12 deletions nativelink-worker/src/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,20 +211,29 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
Update::KeepAlive(()) => {
self.metrics.keep_alives_received.inc();
}
Update::KillActionRequest(kill_action_request) => {
let mut action_id = [0u8; 32];
hex::decode_to_slice(kill_action_request.action_id, &mut action_id as &mut [u8])
.map_err(|e| make_input_err!(
"KillActionRequest failed to decode ActionId hex with error {}",
e
))?;

if let Err(err) = self.running_actions_manager.kill_action(action_id).await {
Update::KillOperationRequest(kill_operation_request) => {
let operation_id_res = kill_operation_request
.operation_id
.as_str()
.try_into();
let operation_id = match operation_id_res {
Ok(operation_id) => operation_id,
Err(err) => {
event!(
Level::ERROR,
?kill_operation_request,
?err,
"Failed to convert string to operation_id"
);
continue;
}
};
if let Err(err) = self.running_actions_manager.kill_operation(&operation_id).await {
event!(
Level::ERROR,
action_id = hex::encode(action_id),
?operation_id,
?err,
"Failed to send kill request for action"
"Failed to send kill request for operation"
);
};
}
Expand Down Expand Up @@ -257,7 +266,7 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
.and_then(|action| {
event!(
Level::INFO,
action_id = hex::encode(action.get_action_id()),
operation_id = ?action.get_operation_id(),
"Received request to run action"
);
action
Expand Down
Loading

0 comments on commit 3b86036

Please sign in to comment.