diff --git a/nativelink-proto/com/github/trace_machina/nativelink/remote_execution/worker_api.proto b/nativelink-proto/com/github/trace_machina/nativelink/remote_execution/worker_api.proto index 0005e7d0c..09598d780 100644 --- a/nativelink-proto/com/github/trace_machina/nativelink/remote_execution/worker_api.proto +++ b/nativelink-proto/com/github/trace_machina/nativelink/remote_execution/worker_api.proto @@ -124,10 +124,10 @@ message ConnectionResult { reserved 2; // NextId. } -/// Request to kill a running action sent from the scheduler to a worker. -message KillActionRequest { - /// The the hex encoded unique qualifier for the action to be killed. - string action_id = 1; +/// Request to kill a running operation sent from the scheduler to a worker. +message KillOperationRequest { + /// The the operation id for the operation to be killed. + string operation_id = 1; reserved 2; // NextId. } /// Communication from the scheduler to the worker. @@ -152,8 +152,8 @@ message UpdateForWorker { /// The worker may discard any outstanding work that is being executed. google.protobuf.Empty disconnect = 4; - /// Instructs the worker to kill a specific running action. - KillActionRequest kill_action_request = 5; + /// Instructs the worker to kill a specific running operation. + KillOperationRequest kill_operation_request = 5; } reserved 6; // NextId. } diff --git a/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs b/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs index ee6da601a..268b5e3ce 100644 --- a/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs +++ b/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs @@ -94,13 +94,13 @@ pub struct ConnectionResult { #[prost(string, tag = "1")] pub worker_id: ::prost::alloc::string::String, } -/// / Request to kill a running action sent from the scheduler to a worker. +/// / Request to kill a running operation sent from the scheduler to a worker. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct KillActionRequest { - /// / The the hex encoded unique qualifier for the action to be killed. +pub struct KillOperationRequest { + /// / The the operation id for the operation to be killed. #[prost(string, tag = "1")] - pub action_id: ::prost::alloc::string::String, + pub operation_id: ::prost::alloc::string::String, } /// / Communication from the scheduler to the worker. #[allow(clippy::derive_partial_eq_without_eq)] @@ -133,9 +133,9 @@ pub mod update_for_worker { /// / The worker may discard any outstanding work that is being executed. #[prost(message, tag = "4")] Disconnect(()), - /// / Instructs the worker to kill a specific running action. + /// / Instructs the worker to kill a specific running operation. #[prost(message, tag = "5")] - KillActionRequest(super::KillActionRequest), + KillOperationRequest(super::KillOperationRequest), } } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index e1938eb5d..b0e14bfe8 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -72,8 +72,8 @@ fn update_eq(expected: UpdateForWorker, actual: UpdateForWorker, ignore_id: bool } _ => false, }, - update_for_worker::Update::KillActionRequest(actual_update) => match expected_update { - update_for_worker::Update::KillActionRequest(expected_update) => { + update_for_worker::Update::KillOperationRequest(actual_update) => match expected_update { + update_for_worker::Update::KillOperationRequest(expected_update) => { expected_update == actual_update } _ => false, diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs index eb5b52ac4..28fcc0b51 100644 --- a/nativelink-worker/src/local_worker.rs +++ b/nativelink-worker/src/local_worker.rs @@ -211,20 +211,29 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, Update::KeepAlive(()) => { self.metrics.keep_alives_received.inc(); } - Update::KillActionRequest(kill_action_request) => { - let mut action_id = [0u8; 32]; - hex::decode_to_slice(kill_action_request.action_id, &mut action_id as &mut [u8]) - .map_err(|e| make_input_err!( - "KillActionRequest failed to decode ActionId hex with error {}", - e - ))?; - - if let Err(err) = self.running_actions_manager.kill_action(action_id).await { + Update::KillOperationRequest(kill_operation_request) => { + let operation_id_res = kill_operation_request + .operation_id + .as_str() + .try_into(); + let operation_id = match operation_id_res { + Ok(operation_id) => operation_id, + Err(err) => { + event!( + Level::ERROR, + ?kill_operation_request, + ?err, + "Failed to convert string to operation_id" + ); + continue; + } + }; + if let Err(err) = self.running_actions_manager.kill_operation(&operation_id).await { event!( Level::ERROR, - action_id = hex::encode(action_id), + ?operation_id, ?err, - "Failed to send kill request for action" + "Failed to send kill request for operation" ); }; } @@ -257,7 +266,7 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, .and_then(|action| { event!( Level::INFO, - action_id = hex::encode(action.get_action_id()), + operation_id = ?action.get_operation_id(), "Received request to run action" ); action diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 84aeda895..a3c979b2f 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -56,7 +56,7 @@ use nativelink_store::filesystem_store::{FileEntry, FilesystemStore}; use nativelink_store::grpc_store::GrpcStore; use nativelink_util::action_messages::{ to_execute_response, ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, - NameOrPath, SymlinkInfo, + NameOrPath, OperationId, SymlinkInfo, }; use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc}; @@ -78,8 +78,6 @@ use tonic::Request; use tracing::{enabled, event, Level}; use uuid::Uuid; -pub type ActionId = [u8; 32]; - /// For simplicity we use a fixed exit code for cases when our program is terminated /// due to a signal. const EXIT_CODE_FOR_SIGNAL: i32 = 9; @@ -531,7 +529,7 @@ async fn process_side_channel_file( async fn do_cleanup( running_actions_manager: &RunningActionsManagerImpl, - action_id: &ActionId, + operation_id: &OperationId, action_directory: &str, ) -> Result<(), Error> { event!(Level::INFO, "Worker cleaning up"); @@ -539,10 +537,10 @@ async fn do_cleanup( let remove_dir_result = fs::remove_dir_all(action_directory) .await .err_tip(|| format!("Could not remove working directory {action_directory}")); - if let Err(err) = running_actions_manager.cleanup_action(action_id) { + if let Err(err) = running_actions_manager.cleanup_action(operation_id) { event!( Level::ERROR, - action_id = hex::encode(action_id), + ?operation_id, ?err, "Error cleaning up action" ); @@ -551,7 +549,7 @@ async fn do_cleanup( if let Err(err) = remove_dir_result { event!( Level::ERROR, - action_id = hex::encode(action_id), + ?operation_id, ?err, "Error removing working directory" ); @@ -562,7 +560,7 @@ async fn do_cleanup( pub trait RunningAction: Sync + Send + Sized + Unpin + 'static { /// Returns the action id of the action. - fn get_action_id(&self) -> &ActionId; + fn get_operation_id(&self) -> &OperationId; /// Anything that needs to execute before the actions is actually executed should happen here. fn prepare_action(self: Arc) -> impl Future, Error>> + Send; @@ -610,8 +608,9 @@ struct RunningActionImplState { error: Option, } +// TODO!(rename operation) pub struct RunningActionImpl { - action_id: ActionId, + operation_id: OperationId, action_directory: String, work_directory: String, action_info: ActionInfo, @@ -624,7 +623,7 @@ pub struct RunningActionImpl { impl RunningActionImpl { fn new( execution_metadata: ExecutionMetadata, - action_id: ActionId, + operation_id: OperationId, action_directory: String, action_info: ActionInfo, timeout: Duration, @@ -633,7 +632,7 @@ impl RunningActionImpl { let work_directory = format!("{}/{}", action_directory, "work"); let (kill_channel_tx, kill_channel_rx) = oneshot::channel(); Self { - action_id, + operation_id, action_directory, work_directory, action_info, @@ -988,14 +987,14 @@ impl RunningActionImpl { if let Err(err) = child_process_guard.start_kill() { event!( Level::ERROR, - action_id = hex::encode(self.action_id), + operation_id = ?self.operation_id, ?err, "Could not kill process", ); } else { event!( Level::ERROR, - action_id = hex::encode(self.action_id), + operation_id = ?self.operation_id, "Could not get child process id, maybe already dead?", ); } @@ -1250,23 +1249,23 @@ impl Drop for RunningActionImpl { if self.did_cleanup.load(Ordering::Acquire) { return; } + let operation_id = self.operation_id.clone(); event!( Level::ERROR, - action_id = hex::encode(self.action_id), + ?operation_id, "RunningActionImpl did not cleanup. This is a violation of the requirements, will attempt to do it in the background." ); let running_actions_manager = self.running_actions_manager.clone(); - let action_id = self.action_id; let action_directory = self.action_directory.clone(); background_spawn!("running_action_impl_drop", async move { let Err(err) = - do_cleanup(&running_actions_manager, &action_id, &action_directory).await + do_cleanup(&running_actions_manager, &operation_id, &action_directory).await else { return; }; event!( Level::ERROR, - action_id = hex::encode(action_id), + ?operation_id, ?action_directory, ?err, "Error cleaning up action" @@ -1276,8 +1275,8 @@ impl Drop for RunningActionImpl { } impl RunningAction for RunningActionImpl { - fn get_action_id(&self) -> &ActionId { - &self.action_id + fn get_operation_id(&self) -> &OperationId { + &self.operation_id } async fn prepare_action(self: Arc) -> Result, Error> { @@ -1311,7 +1310,7 @@ impl RunningAction for RunningActionImpl { .wrap(async move { let result = do_cleanup( &self.running_actions_manager, - &self.action_id, + &self.operation_id, &self.action_directory, ) .await; @@ -1352,7 +1351,10 @@ pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static { fn kill_all(&self) -> impl Future + Send; - fn kill_action(&self, action_id: ActionId) -> impl Future> + Send; + fn kill_operation( + &self, + operation_id: &OperationId, + ) -> impl Future> + Send; fn metrics(&self) -> &Arc; } @@ -1643,7 +1645,7 @@ pub struct RunningActionsManagerImpl { upload_action_results: UploadActionResults, max_action_timeout: Duration, timeout_handled_externally: bool, - running_actions: Mutex>>, + running_actions: Mutex>>, // Note: We don't use Notify because we need to support a .wait_for()-like function, which // Notify does not support. action_done_tx: watch::Sender<()>, @@ -1699,11 +1701,10 @@ impl RunningActionsManagerImpl { fn make_action_directory<'a>( &'a self, - action_id: &'a ActionId, + operation_id: &'a OperationId, ) -> impl Future> + 'a { self.metrics.make_action_directory.wrap(async move { - let action_directory = - format!("{}/{}", self.root_action_directory, hex::encode(action_id)); + let action_directory = format!("{}/{}", self.root_action_directory, operation_id.id); fs::create_dir(&action_directory) .await .err_tip(|| format!("Error creating action directory {action_directory}"))?; @@ -1742,10 +1743,10 @@ impl RunningActionsManagerImpl { }) } - fn cleanup_action(&self, action_id: &ActionId) -> Result<(), Error> { + fn cleanup_action(&self, operation_id: &OperationId) -> Result<(), Error> { let mut running_actions = self.running_actions.lock(); - let result = running_actions.remove(action_id).err_tip(|| { - format!("Expected action id '{action_id:?}' to exist in RunningActionsManagerImpl") + let result = running_actions.remove(operation_id).err_tip(|| { + format!("Expected action id '{operation_id:?}' to exist in RunningActionsManagerImpl") }); // No need to copy anything, we just are telling the receivers an event happened. self.action_done_tx.send_modify(|_| {}); @@ -1754,11 +1755,11 @@ impl RunningActionsManagerImpl { // Note: We do not capture metrics on this call, only `.kill_all()`. // Important: When the future returns the process may still be running. - async fn kill_action(action: Arc) { + async fn kill_operation(action: Arc) { event!( Level::WARN, - action_id = ?hex::encode(action.action_id), - "Sending kill to running action", + operation_id = ?action.operation_id, + "Sending kill to running operation", ); let kill_channel_tx = { let mut action_state = action.state.lock(); @@ -1768,8 +1769,8 @@ impl RunningActionsManagerImpl { if kill_channel_tx.send(()).is_err() { event!( Level::ERROR, - action_id = ?hex::encode(action.action_id), - "Error sending kill to running action", + operation_id = ?action.operation_id, + "Error sending kill to running operation", ); } } @@ -1792,14 +1793,18 @@ impl RunningActionsManager for RunningActionsManagerImpl { .clone() .and_then(|time| time.try_into().ok()) .unwrap_or(SystemTime::UNIX_EPOCH); + let operation_id: OperationId = start_execute + .operation_id + .as_str() + .try_into() + .err_tip(|| "Could not convert to operation_id in RunningActionsManager::create_and_add_action")?; let action_info = self.create_action_info(start_execute, queued_timestamp).await?; event!( Level::INFO, ?action_info, "Worker received action", ); - let action_id = action_info.unique_qualifier.get_hash(); - let action_directory = self.make_action_directory(&action_id).await?; + let action_directory = self.make_action_directory(&operation_id).await?; let execution_metadata = ExecutionMetadata { worker: worker_id, queued_timestamp: action_info.insert_timestamp, @@ -1827,7 +1832,7 @@ impl RunningActionsManager for RunningActionsManagerImpl { } let running_action = Arc::new(RunningActionImpl::new( execution_metadata, - action_id, + operation_id.clone(), action_directory, action_info, timeout, @@ -1835,7 +1840,7 @@ impl RunningActionsManager for RunningActionsManagerImpl { )); { let mut running_actions = self.running_actions.lock(); - running_actions.insert(action_id, Arc::downgrade(&running_action)); + running_actions.insert(operation_id, Arc::downgrade(&running_action)); } Ok(running_action) }) @@ -1858,17 +1863,15 @@ impl RunningActionsManager for RunningActionsManagerImpl { .await } - async fn kill_action(&self, action_id: ActionId) -> Result<(), Error> { + async fn kill_operation(&self, operation_id: &OperationId) -> Result<(), Error> { let running_action = { let running_actions = self.running_actions.lock(); running_actions - .get(&action_id) + .get(operation_id) .and_then(|action| action.upgrade()) - .ok_or_else(|| { - make_input_err!("Failed to get running action {}", hex::encode(action_id)) - })? + .ok_or_else(|| make_input_err!("Failed to get running action {operation_id}"))? }; - Self::kill_action(running_action).await; + Self::kill_operation(running_action).await; Ok(()) } @@ -1877,15 +1880,15 @@ impl RunningActionsManager for RunningActionsManagerImpl { self.metrics .kill_all .wrap_no_capture_result(async move { - let kill_actions: Vec> = { + let kill_operations: Vec> = { let running_actions = self.running_actions.lock(); running_actions .iter() - .filter_map(|(_action_id, action)| action.upgrade()) + .filter_map(|(_operation_id, action)| action.upgrade()) .collect() }; - for action in kill_actions { - Self::kill_action(action).await; + for action in kill_operations { + Self::kill_operation(action).await; } }) .await; diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index 80c8c8482..321f853f5 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -35,14 +35,14 @@ use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::platform::Property; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{ - execute_result, ConnectionResult, ExecuteResult, KillActionRequest, StartExecute, + execute_result, ConnectionResult, ExecuteResult, KillOperationRequest, StartExecute, SupportedProperties, UpdateForWorker, }; use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::FilesystemStore; use nativelink_store::memory_store::MemoryStore; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ExecutionMetadata, + ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ExecutionMetadata, OperationId, }; use nativelink_util::common::{encode_stream_proto, fs, DigestInfo}; use nativelink_util::digest_hasher::DigestHasherFunc; @@ -676,13 +676,14 @@ async fn kill_action_request_kills_action() -> Result<(), Box Result<(), Box SystemTime { previous_time } +fn make_operation_id(execute_request: &ExecuteRequest) -> OperationId { + let unique_qualifier = ActionInfoHashKey { + instance_name: execute_request.instance_name.clone(), + digest_function: execute_request.digest_function.try_into().unwrap(), + digest: execute_request + .action_digest + .clone() + .unwrap() + .try_into() + .unwrap(), + salt: 0, + }; + OperationId::new(unique_qualifier) +} + #[nativelink_test] async fn download_to_directory_file_download_test() -> Result<(), Box> { const FILE1_NAME: &str = "file1.txt"; @@ -443,7 +459,6 @@ async fn ensure_output_files_full_directories_are_created_no_working_directory_t }, )?); { - let operation_id = "55".to_string(); let command = Command { arguments: vec!["touch".to_string(), "./some/path/test.txt".to_string()], output_files: vec!["some/path/test.txt".to_string()], @@ -487,15 +502,17 @@ async fn ensure_output_files_full_directories_are_created_no_working_directory_t ) .await?; + let execute_request = ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }; + let operation_id = make_operation_id(&execute_request).to_string(); + let running_action = running_actions_manager .create_and_add_action( WORKER_ID.to_string(), StartExecute { - execute_request: Some(ExecuteRequest { - action_digest: Some(action_digest.into()), - digest_function: ProtoDigestFunction::Sha256.into(), - ..Default::default() - }), + execute_request: Some(execute_request), operation_id, queued_timestamp: None, }, @@ -557,7 +574,6 @@ async fn ensure_output_files_full_directories_are_created_test( }, )?); { - let operation_id = "55".to_string(); let working_directory = "some_cwd"; let command = Command { arguments: vec!["touch".to_string(), "./some/path/test.txt".to_string()], @@ -603,15 +619,17 @@ async fn ensure_output_files_full_directories_are_created_test( ) .await?; + let execute_request = ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }; + let operation_id = make_operation_id(&execute_request).to_string(); + let running_action = running_actions_manager .create_and_add_action( WORKER_ID.to_string(), StartExecute { - execute_request: Some(ExecuteRequest { - action_digest: Some(action_digest.into()), - digest_function: ProtoDigestFunction::Sha256.into(), - ..Default::default() - }), + execute_request: Some(execute_request), operation_id, queued_timestamp: None, }, @@ -673,7 +691,6 @@ async fn blake3_upload_files() -> Result<(), Box> { }, )?); let action_result = { - let operation_id = "55".to_string(); #[cfg(target_family = "unix")] let arguments = vec![ "sh".to_string(), @@ -734,15 +751,18 @@ async fn blake3_upload_files() -> Result<(), Box> { ) .await?; + let execute_request = ExecuteRequest { + action_digest: Some(action_digest.into()), + digest_function: ProtoDigestFunction::Blake3.into(), + ..Default::default() + }; + let operation_id = make_operation_id(&execute_request).to_string(); + let running_action_impl = running_actions_manager .create_and_add_action( WORKER_ID.to_string(), StartExecute { - execute_request: Some(ExecuteRequest { - action_digest: Some(action_digest.into()), - digest_function: ProtoDigestFunction::Blake3.into(), - ..Default::default() - }), + execute_request: Some(execute_request), operation_id, queued_timestamp: None, }, @@ -844,7 +864,6 @@ async fn upload_files_from_above_cwd_test() -> Result<(), Box Result<(), Box Result<(), Box> )?); let queued_timestamp = make_system_time(1000); let action_result = { - let operation_id = "55".to_string(); let command = Command { arguments: vec![ "sh".to_string(), @@ -1060,15 +1080,17 @@ async fn upload_dir_and_symlink_test() -> Result<(), Box> ) .await?; + let execute_request = ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }; + let operation_id = make_operation_id(&execute_request).to_string(); + let running_action_impl = running_actions_manager .create_and_add_action( WORKER_ID.to_string(), StartExecute { - execute_request: Some(ExecuteRequest { - action_digest: Some(action_digest.into()), - digest_function: ProtoDigestFunction::Sha256.into(), - ..Default::default() - }), + execute_request: Some(execute_request), operation_id, queued_timestamp: Some(queued_timestamp.into()), }, @@ -1223,7 +1245,6 @@ async fn cleanup_happens_on_job_failure() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { const WORKER_ID: &str = "foo_worker_id"; - let operation_id = "55".to_string(); let (_, _, cas_store, ac_store) = setup_stores().await?; let root_action_directory = make_temp_path("root_action_directory"); @@ -1383,16 +1405,18 @@ async fn kill_ends_action() -> Result<(), Box> { ) .await?; + let execute_request = ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }; + let operation_id = make_operation_id(&execute_request).to_string(); + let running_action_impl = running_actions_manager .clone() .create_and_add_action( WORKER_ID.to_string(), StartExecute { - execute_request: Some(ExecuteRequest { - action_digest: Some(action_digest.into()), - digest_function: ProtoDigestFunction::Sha256.into(), - ..Default::default() - }), + execute_request: Some(execute_request), operation_id, queued_timestamp: Some(make_system_time(1000).into()), }, @@ -1445,7 +1469,6 @@ echo | set /p=\"Wrapper script did run\" 1>&2 exit 0 "; const WORKER_ID: &str = "foo_worker_id"; - let operation_id = "66".to_string(); const EXPECTED_STDOUT: &str = "Action did run"; let (_, _, cas_store, ac_store) = setup_stores().await?; @@ -1528,16 +1551,18 @@ exit 0 ) .await?; + let execute_request = ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }; + let operation_id = make_operation_id(&execute_request).to_string(); + let running_action_impl = running_actions_manager .clone() .create_and_add_action( WORKER_ID.to_string(), StartExecute { - execute_request: Some(ExecuteRequest { - action_digest: Some(action_digest.into()), - digest_function: ProtoDigestFunction::Sha256.into(), - ..Default::default() - }), + execute_request: Some(execute_request), operation_id, queued_timestamp: Some(make_system_time(1000).into()), }, @@ -1587,7 +1612,6 @@ echo | set /p=\"Wrapper script did run with property %PROPERTY% %VALUE% %INNER_T exit 0 "; const WORKER_ID: &str = "foo_worker_id"; - let operation_id = "66".to_string(); const EXPECTED_STDOUT: &str = "Action did run"; let (_, _, cas_store, ac_store) = setup_stores().await?; @@ -1694,16 +1718,18 @@ exit 0 ) .await?; + let execute_request = ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }; + let operation_id = make_operation_id(&execute_request).to_string(); + let running_action_impl = running_actions_manager .clone() .create_and_add_action( WORKER_ID.to_string(), StartExecute { - execute_request: Some(ExecuteRequest { - action_digest: Some(action_digest.into()), - digest_function: ProtoDigestFunction::Sha256.into(), - ..Default::default() - }), + execute_request: Some(execute_request), operation_id, queued_timestamp: Some(make_system_time(1000).into()), }, @@ -1751,7 +1777,6 @@ echo | set /p={\"failure\":\"timeout\"} 1>&2 > %SIDE_CHANNEL_FILE% exit 1 "; const WORKER_ID: &str = "foo_worker_id"; - let operation_id = "66".to_string(); let (_, _, cas_store, ac_store) = setup_stores().await?; let root_action_directory = make_temp_path("root_action_directory"); @@ -1833,16 +1858,18 @@ exit 1 ) .await?; + let execute_request = ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }; + let operation_id = make_operation_id(&execute_request).to_string(); + let running_action_impl = running_actions_manager .clone() .create_and_add_action( WORKER_ID.to_string(), StartExecute { - execute_request: Some(ExecuteRequest { - action_digest: Some(action_digest.into()), - digest_function: ProtoDigestFunction::Sha256.into(), - ..Default::default() - }), + execute_request: Some(execute_request), operation_id, queued_timestamp: Some(make_system_time(1000).into()), }, @@ -2346,16 +2373,18 @@ async fn ensure_worker_timeout_chooses_correct_values() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { ) .await?; + let execute_request = ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }; + let operation_id = make_operation_id(&execute_request).to_string(); + let execute_results_fut = running_actions_manager .create_and_add_action( WORKER_ID.to_string(), StartExecute { - execute_request: Some(ExecuteRequest { - action_digest: Some(action_digest.into()), - digest_function: ProtoDigestFunction::Sha256.into(), - ..Default::default() - }), - operation_id: "".to_string(), + execute_request: Some(execute_request), + operation_id, queued_timestamp: Some(make_system_time(1000).into()), }, ) @@ -2740,18 +2775,20 @@ async fn kill_all_waits_for_all_tasks_to_finish() -> Result<(), Box Result<(), Box> { ) .await?; + let execute_request = ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }; + let operation_id = make_operation_id(&execute_request).to_string(); + let running_action_impl = running_actions_manager .create_and_add_action( WORKER_ID.to_string(), StartExecute { - execute_request: Some(ExecuteRequest { - action_digest: Some(action_digest.into()), - digest_function: ProtoDigestFunction::Sha256.into(), - ..Default::default() - }), + execute_request: Some(execute_request), + operation_id, ..Default::default() }, ) @@ -2946,7 +2986,6 @@ async fn action_directory_contents_are_cleaned() -> Result<(), Box Result<(), Box Result<(), Box> { }, )?); let action_result = { - let operation_id = "55".to_string(); #[cfg(target_family = "unix")] let arguments = vec![ "sh".to_string(), @@ -3110,15 +3150,17 @@ async fn upload_with_single_permit() -> Result<(), Box> { ) .await?; + let execute_request = ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }; + let operation_id = make_operation_id(&execute_request).to_string(); + let running_action_impl = running_actions_manager .create_and_add_action( WORKER_ID.to_string(), StartExecute { - execute_request: Some(ExecuteRequest { - action_digest: Some(action_digest.into()), - digest_function: ProtoDigestFunction::Sha256.into(), - ..Default::default() - }), + execute_request: Some(execute_request), operation_id, queued_timestamp: None, }, @@ -3196,7 +3238,6 @@ async fn upload_with_single_permit() -> Result<(), Box> { async fn running_actions_manager_respects_action_timeout() -> Result<(), Box> { const WORKER_ID: &str = "foo_worker_id"; - let operation_id = "66".to_string(); let (_, _, cas_store, ac_store) = setup_stores().await?; let root_action_directory = make_temp_path("root_work_directory"); @@ -3282,16 +3323,18 @@ async fn running_actions_manager_respects_action_timeout() -> Result<(), Box>, tx_kill_all: mpsc::UnboundedSender<()>, - rx_kill_action: Mutex>, - tx_kill_action: mpsc::UnboundedSender, + rx_kill_operation: Mutex>, + tx_kill_operation: mpsc::UnboundedSender, metrics: Arc, } @@ -61,7 +59,7 @@ impl MockRunningActionsManager { let (tx_call, rx_call) = mpsc::unbounded_channel(); let (tx_resp, rx_resp) = mpsc::unbounded_channel(); let (tx_kill_all, rx_kill_all) = mpsc::unbounded_channel(); - let (tx_kill_action, rx_kill_action) = mpsc::unbounded_channel(); + let (tx_kill_operation, rx_kill_operation) = mpsc::unbounded_channel(); Self { rx_call: Mutex::new(rx_call), tx_call, @@ -69,8 +67,8 @@ impl MockRunningActionsManager { tx_resp, rx_kill_all: Mutex::new(rx_kill_all), tx_kill_all, - rx_kill_action: Mutex::new(rx_kill_action), - tx_kill_action, + rx_kill_operation: Mutex::new(rx_kill_operation), + tx_kill_operation, metrics: Arc::new(Metrics::default()), } } @@ -116,9 +114,9 @@ impl MockRunningActionsManager { .expect("Could not receive msg in mpsc"); } - pub async fn expect_kill_action(&self) -> ActionId { - let mut rx_kill_action_lock = self.rx_kill_action.lock().await; - rx_kill_action_lock + pub async fn expect_kill_operation(&self) -> OperationId { + let mut rx_kill_operation_lock = self.rx_kill_operation.lock().await; + rx_kill_operation_lock .recv() .await .expect("Could not receive msg in mpsc") @@ -165,9 +163,9 @@ impl RunningActionsManager for MockRunningActionsManager { Ok(()) } - async fn kill_action(&self, action_id: ActionId) -> Result<(), Error> { - self.tx_kill_action - .send(action_id) + async fn kill_operation(&self, operation_id: &OperationId) -> Result<(), Error> { + self.tx_kill_operation + .send(operation_id.clone()) .expect("Could not send request to mpsc"); Ok(()) } @@ -344,7 +342,7 @@ impl MockRunningAction { } impl RunningAction for MockRunningAction { - fn get_action_id(&self) -> &ActionId { + fn get_operation_id(&self) -> &OperationId { unreachable!("not implemented for tests"); }