From ec204fa3b795ff60fe13d55f79ec75b178a7b1cc Mon Sep 17 00:00:00 2001 From: Marcin S Date: Sun, 11 Dec 2022 10:55:23 -0500 Subject: [PATCH 01/18] Replace async-std with tokio in PVF subsystem --- Cargo.lock | 92 ++---------- node/core/pvf/Cargo.toml | 3 +- node/core/pvf/src/artifacts.rs | 19 ++- node/core/pvf/src/execute/queue.rs | 3 +- node/core/pvf/src/execute/worker.rs | 49 +++---- node/core/pvf/src/host.rs | 30 ++-- node/core/pvf/src/prepare/pool.rs | 9 +- node/core/pvf/src/prepare/queue.rs | 14 +- node/core/pvf/src/prepare/worker.rs | 73 +++++----- node/core/pvf/src/worker_common.rs | 178 +++++++++++++----------- node/core/pvf/tests/it/adder.rs | 8 +- node/core/pvf/tests/it/main.rs | 10 +- node/core/pvf/tests/it/worker_common.rs | 4 +- 13 files changed, 211 insertions(+), 281 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ac2bd158e616..dfd900640bac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,16 +168,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" -[[package]] -name = "async-attributes" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "async-channel" version = "1.6.1" @@ -221,10 +211,11 @@ dependencies = [ [[package]] name = "async-io" -version = "1.6.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a811e6a479f2439f0c04038796b5cfb3d2ad56c230e0f2d3f7b04d68cfee607b" +checksum = "83e21f3a490c72b3b0cf44962180e60045de2925d8dff97918f7ee43c8f637c7" dependencies = [ + "autocfg", "concurrent-queue", "futures-lite", "libc", @@ -256,50 +247,6 @@ dependencies = [ "event-listener", ] -[[package]] -name = "async-process" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83137067e3a2a6a06d67168e49e68a0957d215410473a740cea95a2425c0b7c6" -dependencies = [ - "async-io", - "blocking", - "cfg-if", - "event-listener", - "futures-lite", - "libc", - "once_cell", - "signal-hook", - "winapi", -] - -[[package]] -name = "async-std" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" -dependencies = [ - "async-attributes", - "async-channel", - "async-global-executor", - "async-io", - "async-lock", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "once_cell", - "pin-project-lite 0.2.7", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - [[package]] name = "async-task" version = "4.0.3" @@ -2595,19 +2542,6 @@ dependencies = [ "regex", ] -[[package]] -name = "gloo-timers" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "group" version = "0.12.1" @@ -3070,7 +3004,7 @@ dependencies = [ "thiserror", "tokio", "tokio-rustls", - "tokio-util 0.7.1", + "tokio-util 0.7.4", "tracing", "webpki-roots", ] @@ -3178,7 +3112,7 @@ dependencies = [ "soketto", "tokio", "tokio-stream", - "tokio-util 0.7.1", + "tokio-util 0.7.4", "tracing", "tracing-futures", ] @@ -3318,15 +3252,6 @@ dependencies = [ "sp-weights", ] -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "kvdb" version = "0.13.0" @@ -6431,8 +6356,6 @@ version = "0.9.33" dependencies = [ "always-assert", "assert_matches", - "async-process", - "async-std", "cpu-time", "futures", "futures-timer", @@ -6457,6 +6380,7 @@ dependencies = [ "tempfile", "test-parachain-adder", "test-parachain-halt", + "tokio", "tracing-gum", ] @@ -11300,9 +11224,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.1" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764" +checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" dependencies = [ "bytes", "futures-core", diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index 2aaf408ae56d..e00092826428 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -10,8 +10,6 @@ path = "bin/puppet_worker.rs" [dependencies] always-assert = "0.1" -async-std = { version = "1.11.0", features = ["attributes"] } -async-process = "1.3.0" assert_matches = "1.4.0" cpu-time = "1.0.0" futures = "0.3.21" @@ -21,6 +19,7 @@ gum = { package = "tracing-gum", path = "../../gum" } pin-project = "1.0.9" rand = "0.8.5" tempfile = "3.3.0" +tokio = { version = "1.22.0", features = ["fs", "process"] } rayon = "1.5.1" parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] } diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index 413d73b4c558..297ed0829cca 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -16,10 +16,10 @@ use crate::{error::PrepareError, host::PrepareResultSender}; use always_assert::always; -use async_std::path::{Path, PathBuf}; use polkadot_parachain::primitives::ValidationCodeHash; use std::{ collections::HashMap, + path::{Path, PathBuf}, time::{Duration, SystemTime}, }; @@ -136,8 +136,8 @@ impl Artifacts { pub async fn new(cache_path: &Path) -> Self { // Make sure that the cache path directory and all its parents are created. // First delete the entire cache. Nodes are long-running so this should populate shortly. - let _ = async_std::fs::remove_dir_all(cache_path).await; - let _ = async_std::fs::create_dir_all(cache_path).await; + let _ = tokio::fs::remove_dir_all(cache_path).await; + let _ = tokio::fs::create_dir_all(cache_path).await; Self { artifacts: HashMap::new() } } @@ -214,9 +214,8 @@ impl Artifacts { #[cfg(test)] mod tests { use super::{ArtifactId, Artifacts}; - use async_std::path::Path; use sp_core::H256; - use std::str::FromStr; + use std::{path::Path, str::FromStr}; #[test] fn from_file_name() { @@ -252,11 +251,9 @@ mod tests { ); } - #[test] - fn artifacts_removes_cache_on_startup() { - let fake_cache_path = async_std::task::block_on(async move { - crate::worker_common::tmpfile("test-cache").await.unwrap() - }); + #[tokio::test] + async fn artifacts_removes_cache_on_startup() { + let fake_cache_path = crate::worker_common::tmpfile("test-cache").await.unwrap(); let fake_artifact_path = { let mut p = fake_cache_path.clone(); p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234"); @@ -271,7 +268,7 @@ mod tests { // this should remove it and re-create. let p = &fake_cache_path; - async_std::task::block_on(async { Artifacts::new(p).await }); + Artifacts::new(p).await; assert_eq!(std::fs::read_dir(&fake_cache_path).unwrap().count(), 0); diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs index 72b6e450351b..f2f1b4e0cfff 100644 --- a/node/core/pvf/src/execute/queue.rs +++ b/node/core/pvf/src/execute/queue.rs @@ -24,7 +24,6 @@ use crate::{ worker_common::{IdleWorker, WorkerHandle}, InvalidCandidate, ValidationError, LOG_TARGET, }; -use async_std::path::PathBuf; use futures::{ channel::mpsc, future::BoxFuture, @@ -32,7 +31,7 @@ use futures::{ Future, FutureExt, }; use slotmap::HopSlotMap; -use std::{collections::VecDeque, fmt, time::Duration}; +use std::{collections::VecDeque, fmt, path::PathBuf, time::Duration}; slotmap::new_key_type! { struct Worker; } diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index 105accf18e2b..eb9f5dcb2a06 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -24,25 +24,18 @@ use crate::{ }, LOG_TARGET, }; -use async_std::{ - io, - os::unix::net::UnixStream, - path::{Path, PathBuf}, - task, -}; use cpu_time::ProcessTime; use futures::FutureExt; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; use polkadot_parachain::primitives::ValidationResult; use std::{ - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + path::{Path, PathBuf}, + sync::Arc, thread, time::Duration, }; +use tokio::{io, net::UnixStream, runtime::Runtime, sync::Mutex}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -235,13 +228,19 @@ impl Response { /// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies /// the path to the socket used to communicate with the host. pub fn worker_entrypoint(socket_path: &str) { - worker_event_loop("execute", socket_path, |mut stream| async move { + worker_event_loop("execute", socket_path, |stream| async move { let executor = Executor::new().map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) })?; + let mutex = Arc::new(Mutex::new((stream, false))); loop { - let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; + let (artifact_path, params, execution_timeout) = { + let mut lock = mutex.lock().await; + // Unset the lock flag. We set it when either thread finishes. + lock.1 = false; + recv_request(&mut lock.0).await? + }; gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), @@ -249,24 +248,22 @@ pub fn worker_entrypoint(socket_path: &str) { artifact_path.display(), ); - // Create a lock flag. We set it when either thread finishes. - let lock = Arc::new(AtomicBool::new(false)); let cpu_time_start = ProcessTime::now(); // Spawn a new thread that runs the CPU time monitor. Continuously wakes up from // sleeping and then either sleeps for the remaining CPU time, or kills the process if // we exceed the CPU timeout. - let (stream_2, cpu_time_start_2, execution_timeout_2, lock_2) = - (stream.clone(), cpu_time_start, execution_timeout, lock.clone()); + let (mutex_2, cpu_time_start_2, execution_timeout_2) = + (mutex.clone(), cpu_time_start, execution_timeout); let handle = thread::Builder::new().name("CPU time monitor".into()).spawn(move || { - task::block_on(async { + let rt = Runtime::new().unwrap(); + rt.block_on(async { cpu_time_monitor_loop( JobKind::Execute, - stream_2, + mutex_2, cpu_time_start_2, execution_timeout_2, - lock_2, ) .await; }) @@ -275,18 +272,16 @@ pub fn worker_entrypoint(socket_path: &str) { let response = validate_using_artifact(&artifact_path, ¶ms, &executor, cpu_time_start).await; - let lock_result = - lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); - if lock_result.is_err() { - // The other thread is still sending an error response over the socket. Wait on it - // and return. - let _ = handle.join(); - // Monitor thread detected timeout and likely already terminated the process, + let mut lock = mutex.lock().await; + if lock.1 { + // Monitor thread detected timeout and the process should be terminated soon, // nothing to do. + let _ = handle.join(); continue } + lock.1 = true; - send_response(&mut stream, response).await?; + send_response(&mut lock.0, response).await?; } }); } diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 0f2e2b839a80..9513e8dc2008 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -28,7 +28,6 @@ use crate::{ prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, }; use always_assert::never; -use async_std::path::{Path, PathBuf}; use futures::{ channel::{mpsc, oneshot}, Future, FutureExt, SinkExt, StreamExt, @@ -36,6 +35,7 @@ use futures::{ use polkadot_parachain::primitives::ValidationResult; use std::{ collections::HashMap, + path::{Path, PathBuf}, time::{Duration, SystemTime}, }; @@ -171,7 +171,7 @@ pub struct Config { impl Config { /// Create a new instance of the configuration. pub fn new(cache_path: std::path::PathBuf, program_path: std::path::PathBuf) -> Self { - // Do not contaminate the other parts of the codebase with the types from `async_std`. + // Do not contaminate the other parts of the codebase with the types from `tokio`. let cache_path = PathBuf::from(cache_path); let program_path = PathBuf::from(program_path); @@ -758,7 +758,7 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver) { match sweeper_rx.next().await { None => break, Some(condemned) => { - let result = async_std::fs::remove_file(&condemned).await; + let result = tokio::fs::remove_file(&condemned).await; gum::trace!( target: LOG_TARGET, ?result, @@ -808,7 +808,7 @@ mod tests { const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); - #[async_std::test] + #[tokio::test] async fn pulse_test() { let pulse = pulse_every(Duration::from_millis(100)); futures::pin_mut!(pulse); @@ -998,11 +998,11 @@ mod tests { } } - #[async_std::test] + #[tokio::test] async fn shutdown_on_handle_drop() { let test = Builder::default().build(); - let join_handle = async_std::task::spawn(test.run); + let join_handle = tokio::task::spawn(test.run); // Dropping the handle will lead to conclusion of the read part and thus will make the event // loop to stop, which in turn will resolve the join handle. @@ -1010,7 +1010,7 @@ mod tests { join_handle.await; } - #[async_std::test] + #[tokio::test] async fn pruning() { let mock_now = SystemTime::now() - Duration::from_millis(1000); @@ -1040,7 +1040,7 @@ mod tests { test.poll_ensure_to_sweeper_is_empty().await; } - #[async_std::test] + #[tokio::test] async fn execute_pvf_requests() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1140,7 +1140,7 @@ mod tests { ); } - #[async_std::test] + #[tokio::test] async fn precheck_pvf() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1195,7 +1195,7 @@ mod tests { } } - #[async_std::test] + #[tokio::test] async fn test_prepare_done() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1282,7 +1282,7 @@ mod tests { // Test that multiple prechecking requests do not trigger preparation retries if the first one // failed. - #[async_std::test] + #[tokio::test] async fn test_precheck_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1325,7 +1325,7 @@ mod tests { // Test that multiple execution requests trigger preparation retries if the first one failed due // to a potentially non-reproducible error. - #[async_std::test] + #[tokio::test] async fn test_execute_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1395,7 +1395,7 @@ mod tests { // Test that multiple execution requests don't trigger preparation retries if the first one // failed due to reproducible error (e.g. Prevalidation). - #[async_std::test] + #[tokio::test] async fn test_execute_prepare_no_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1461,7 +1461,7 @@ mod tests { } // Test that multiple heads-up requests trigger preparation retries if the first one failed. - #[async_std::test] + #[tokio::test] async fn test_heads_up_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1502,7 +1502,7 @@ mod tests { ); } - #[async_std::test] + #[tokio::test] async fn cancellation() { let mut test = Builder::default().build(); let mut host = test.host_handle(); diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 306588eb429a..74f132fd0f9f 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -23,12 +23,17 @@ use crate::{ }; use always_assert::never; use assert_matches::assert_matches; -use async_std::path::{Path, PathBuf}; use futures::{ channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt, }; use slotmap::HopSlotMap; -use std::{fmt, sync::Arc, task::Poll, time::Duration}; +use std::{ + fmt, + path::{Path, PathBuf}, + sync::Arc, + task::Poll, + time::Duration, +}; slotmap::new_key_type! { pub struct Worker; } diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index df0a8ec41883..0bc67c79e2eb 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -19,10 +19,10 @@ use super::pool::{self, Worker}; use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET}; use always_assert::{always, never}; -use async_std::path::PathBuf; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; use std::{ collections::{HashMap, VecDeque}, + path::PathBuf, time::Duration, }; @@ -603,7 +603,7 @@ mod tests { } } - #[async_std::test] + #[tokio::test] async fn properly_concludes() { let mut test = Test::new(2, 2); @@ -625,7 +625,7 @@ mod tests { assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } - #[async_std::test] + #[tokio::test] async fn dont_spawn_over_soft_limit_unless_critical() { let mut test = Test::new(2, 3); let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; @@ -669,7 +669,7 @@ mod tests { assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); } - #[async_std::test] + #[tokio::test] async fn cull_unwanted() { let mut test = Test::new(1, 2); let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; @@ -707,7 +707,7 @@ mod tests { assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1)); } - #[async_std::test] + #[tokio::test] async fn worker_mass_die_out_doesnt_stall_queue() { let mut test = Test::new(2, 2); @@ -741,7 +741,7 @@ mod tests { assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } - #[async_std::test] + #[tokio::test] async fn doesnt_resurrect_ripped_worker_if_no_work() { let mut test = Test::new(2, 2); @@ -766,7 +766,7 @@ mod tests { test.poll_ensure_to_pool_is_empty().await; } - #[async_std::test] + #[tokio::test] async fn rip_for_start_work() { let mut test = Test::new(2, 2); diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 91361eacaf26..4714bd97bbb8 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -24,24 +24,17 @@ use crate::{ }, LOG_TARGET, }; -use async_std::{ - io, - os::unix::net::UnixStream, - path::{Path, PathBuf}, - task, -}; use cpu_time::ProcessTime; use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; use std::{ panic, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + path::{Path, PathBuf}, + sync::Arc, thread, time::Duration, }; +use tokio::{io, net::UnixStream, runtime::Runtime, sync::Mutex}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -118,7 +111,7 @@ pub async fn start_work( // load, but the CPU resources of the child can only be measured from the parent after the // child process terminates. let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; - let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await; + let result = tokio::time::timeout(timeout, framed_recv(&mut stream)).await; let selected = match result { // Received bytes from worker within the time limit. @@ -219,7 +212,7 @@ async fn handle_response_bytes( artifact_path.display(), ); - async_std::fs::rename(&tmp_file, &artifact_path) + tokio::fs::rename(&tmp_file, &artifact_path) .await .map(|_| Selected::Done(result)) .unwrap_or_else(|err| { @@ -264,7 +257,7 @@ where // // In any case, we try to remove the file here so that there are no leftovers. We only report // errors that are different from the `NotFound`. - match async_std::fs::remove_file(tmp_file).await { + match tokio::fs::remove_file(tmp_file).await { Ok(()) => (), Err(err) if err.kind() == std::io::ErrorKind::NotFound => (), Err(err) => { @@ -314,59 +307,61 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies /// the path to the socket used to communicate with the host. pub fn worker_entrypoint(socket_path: &str) { - worker_event_loop("prepare", socket_path, |mut stream| async move { + worker_event_loop("prepare", socket_path, |stream| async move { + let mutex = Arc::new(Mutex::new((stream, false))); loop { - let (code, dest, preparation_timeout) = recv_request(&mut stream).await?; - + let (code, dest, preparation_timeout) = { + let mut lock = mutex.lock().await; + // Unset the lock flag. We set it when either thread finishes. + lock.1 = false; + recv_request(&mut lock.0).await? + }; gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), "worker: preparing artifact", ); - // Create a lock flag. We set it when either thread finishes. - let lock = Arc::new(AtomicBool::new(false)); let cpu_time_start = ProcessTime::now(); // Spawn a new thread that runs the CPU time monitor. Continuously wakes up from // sleeping and then either sleeps for the remaining CPU time, or kills the process if // we exceed the CPU timeout. - let (stream_2, cpu_time_start_2, preparation_timeout_2, lock_2) = - (stream.clone(), cpu_time_start, preparation_timeout, lock.clone()); + let (mutex_2, cpu_time_start_2, preparation_timeout_2) = + (mutex.clone(), cpu_time_start, preparation_timeout); let handle = thread::Builder::new().name("CPU time monitor".into()).spawn(move || { - task::block_on(async { + let rt = Runtime::new().unwrap(); + rt.block_on(async { cpu_time_monitor_loop( JobKind::Prepare, - stream_2, + mutex_2, cpu_time_start_2, preparation_timeout_2, - lock_2, ) .await; }) })?; // Prepares the artifact in a separate thread. - let result = match prepare_artifact(&code).await { + let compilation_result = prepare_artifact(&code).await; + let cpu_time_elapsed = cpu_time_start.elapsed(); + + let mut lock = mutex.lock().await; + if lock.1 { + // Monitor thread detected timeout and the process should be terminated soon, + // nothing to do. + let _ = handle.join(); + continue + } + lock.1 = true; + + let result = match compilation_result { Err(err) => { // Serialized error will be written into the socket. Err(err) }, Ok(compiled_artifact) => { - let cpu_time_elapsed = cpu_time_start.elapsed(); - - let lock_result = - lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); - if lock_result.is_err() { - // The other thread is still sending an error response over the socket. Wait on it and - // return. - let _ = handle.join(); - // Monitor thread detected timeout and likely already terminated the - // process, nothing to do. - continue - } - // Write the serialized artifact into a temp file. // // PVF host only keeps artifacts statuses in its memory, successfully compiled code gets stored @@ -379,13 +374,13 @@ pub fn worker_entrypoint(socket_path: &str) { "worker: writing artifact to {}", dest.display(), ); - async_std::fs::write(&dest, &compiled_artifact).await?; + tokio::fs::write(&dest, &compiled_artifact).await?; Ok(cpu_time_elapsed) }, }; - framed_send(&mut stream, result.encode().as_slice()).await?; + framed_send(&mut lock.0, result.encode().as_slice()).await?; } }); } diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index f9eaf42dcf67..f0dac2bed2fb 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -17,29 +17,27 @@ //! Common logic for implementation of worker processes. use crate::{execute::ExecuteResponse, PrepareError, LOG_TARGET}; -use async_std::{ - io, - os::unix::net::{UnixListener, UnixStream}, - path::{Path, PathBuf}, -}; use cpu_time::ProcessTime; -use futures::{ - never::Never, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, FutureExt as _, -}; +use futures::{never::Never, FutureExt as _}; use futures_timer::Delay; use parity_scale_codec::Encode; use pin_project::pin_project; use rand::Rng; use std::{ fmt, mem, + path::{Path, PathBuf}, pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::Arc, task::{Context, Poll}, time::Duration, }; +use tokio::{ + io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf}, + net::{UnixListener, UnixStream}, + process, + runtime::Runtime, + sync::Mutex, +}; /// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in /// wall clock time). This is lenient because CPU time may go slower than wall clock time. @@ -76,7 +74,7 @@ pub async fn spawn_with_program_path( with_transient_socket_path(debug_id, |socket_path| { let socket_path = socket_path.to_owned(); async move { - let listener = UnixListener::bind(&socket_path).await.map_err(|err| { + let listener = UnixListener::bind(&socket_path).map_err(|err| { gum::warn!( target: LOG_TARGET, %debug_id, @@ -131,7 +129,7 @@ where // Best effort to remove the socket file. Under normal circumstances the socket will be removed // by the worker. We make sure that it is removed here, just in case a failed rendezvous. - let _ = async_std::fs::remove_file(socket_path).await; + let _ = tokio::fs::remove_file(socket_path).await; result } @@ -162,7 +160,7 @@ pub async fn tmpfile_in(prefix: &str, dir: &Path) -> io::Result { for _ in 0..NUM_RETRIES { let candidate_path = tmppath(prefix, dir); - if !candidate_path.exists().await { + if !candidate_path.exists() { return Ok(candidate_path) } } @@ -181,13 +179,15 @@ where F: FnMut(UnixStream) -> Fut, Fut: futures::Future>, { - let err = async_std::task::block_on::<_, io::Result>(async move { - let stream = UnixStream::connect(socket_path).await?; - let _ = async_std::fs::remove_file(socket_path).await; + let rt = Runtime::new().unwrap(); + let err = rt + .block_on(async move { + let stream = UnixStream::connect(socket_path).await?; + let _ = tokio::fs::remove_file(socket_path).await; - event_loop(stream).await - }) - .unwrap_err(); // it's never `Ok` because it's `Ok(Never)` + event_loop(stream).await + }) + .unwrap_err(); // it's never `Ok` because it's `Ok(Never)` gum::debug!( target: LOG_TARGET, @@ -206,64 +206,65 @@ where /// background. When it wakes, it will see that the flag has been set and return. pub async fn cpu_time_monitor_loop( job_kind: JobKind, - mut stream: UnixStream, + mutex: Arc>, cpu_time_start: ProcessTime, timeout: Duration, - lock: Arc, ) { loop { let cpu_time_elapsed = cpu_time_start.elapsed(); // Treat the timeout as CPU time, which is less subject to variance due to load. - if cpu_time_elapsed > timeout { - let result = lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); - if result.is_err() { - // Hit the job-completed case first, return from this thread. - return - } + if cpu_time_elapsed <= timeout { + // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep + // is wall clock time. The CPU clock may be slower than the wall clock. + let sleep_interval = timeout - cpu_time_elapsed + JOB_TIMEOUT_OVERHEAD; + std::thread::sleep(sleep_interval); + continue + } - // Log if we exceed the timeout. + let mut lock = mutex.lock().await; + if lock.1 { + // Hit the job-completed case first, return from this thread. + return + } + lock.1 = true; + + // Log if we exceed the timeout. + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "{job_kind} job took {}ms cpu time, exceeded {job_kind} timeout {}ms", + cpu_time_elapsed.as_millis(), + timeout.as_millis(), + ); + + // Send back a `TimedOut` error. + // + // NOTE: This will cause the worker, whether preparation or execution, to be killed by the + // host. We do not kill the process here because it would interfere with the proper handling + // of this error. + let encoded_result = match job_kind { + JobKind::Prepare => { + let result: Result<(), PrepareError> = Err(PrepareError::TimedOut); + result.encode() + }, + JobKind::Execute => { + let result = ExecuteResponse::TimedOut; + result.encode() + }, + }; + // If we error here there is nothing we can do apart from log it. The receiving side will + // just have to time out. + if let Err(err) = framed_send(&mut lock.0, encoded_result.as_slice()).await { gum::warn!( target: LOG_TARGET, worker_pid = %std::process::id(), - "{job_kind} job took {}ms cpu time, exceeded {job_kind} timeout {}ms", - cpu_time_elapsed.as_millis(), - timeout.as_millis(), + "{job_kind} worker -> pvf host: error sending result over the socket: {:?}", + err ); - - // Send back a `TimedOut` error. - // - // NOTE: This will cause the worker, whether preparation or execution, to be killed by - // the host. We do not kill the process here because it would interfere with the proper - // handling of this error. - let encoded_result = match job_kind { - JobKind::Prepare => { - let result: Result<(), PrepareError> = Err(PrepareError::TimedOut); - result.encode() - }, - JobKind::Execute => { - let result = ExecuteResponse::TimedOut; - result.encode() - }, - }; - // If we error here there is nothing we can do apart from log it. The receiving side - // will just have to time out. - if let Err(err) = framed_send(&mut stream, encoded_result.as_slice()).await { - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "{job_kind} worker -> pvf host: error sending result over the socket: {:?}", - err - ); - } - - return } - // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep - // is wall clock time. The CPU clock may be slower than the wall clock. - let sleep_interval = timeout - cpu_time_elapsed + JOB_TIMEOUT_OVERHEAD; - std::thread::sleep(sleep_interval); + return } } @@ -304,9 +305,10 @@ pub enum SpawnErr { /// This future relies on the fact that a child process's stdout `fd` is closed upon it's termination. #[pin_project] pub struct WorkerHandle { - child: async_process::Child, + child: process::Child, + child_id: u32, #[pin] - stdout: async_process::ChildStdout, + stdout: process::ChildStdout, program: PathBuf, drop_box: Box<[u8]>, } @@ -317,13 +319,16 @@ impl WorkerHandle { extra_args: &[&str], socket_path: impl AsRef, ) -> io::Result { - let mut child = async_process::Command::new(program.as_ref()) + let mut child = process::Command::new(program.as_ref()) .args(extra_args) .arg(socket_path.as_ref().as_os_str()) - .stdout(async_process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) .kill_on_drop(true) .spawn()?; + let child_id = child + .id() + .ok_or(io::Error::new(io::ErrorKind::Other, "could not get id of spawned process"))?; let stdout = child .stdout .take() @@ -331,6 +336,7 @@ impl WorkerHandle { Ok(WorkerHandle { child, + child_id, stdout, program: program.as_ref().to_path_buf(), // We don't expect the bytes to be ever read. But in case we do, we should not use a buffer @@ -348,7 +354,7 @@ impl WorkerHandle { /// Returns the process id of this worker. pub fn id(&self) -> u32 { - self.child.id() + self.child_id } } @@ -357,25 +363,35 @@ impl futures::Future for WorkerHandle { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - match futures::ready!(AsyncRead::poll_read(me.stdout, cx, &mut *me.drop_box)) { - Ok(0) => { - // 0 means `EOF` means the child was terminated. Resolve. - Poll::Ready(()) - }, - Ok(_bytes_read) => { - // weird, we've read something. Pretend that never happened and reschedule ourselves. - cx.waker().wake_by_ref(); - Poll::Pending + // Create a `ReadBuf` here instead of storing it in `WorkerHandle` to avoid a lifetime + // parameter on `WorkerHandle`. Creating the `ReadBuf` is fairly cheap. + let mut read_buf = ReadBuf::new(&mut *me.drop_box); + match futures::ready!(AsyncRead::poll_read(me.stdout, cx, &mut read_buf)) { + Ok(()) => { + if read_buf.filled().len() > 0 { + // weird, we've read something. Pretend that never happened and reschedule + // ourselves. + cx.waker().wake_by_ref(); + Poll::Pending + } else { + // Nothing read means `EOF` means the child was terminated. Resolve. + Poll::Ready(()) + } }, Err(err) => { // The implementation is guaranteed to not to return `WouldBlock` and Interrupted. This // leaves us with legit errors which we suppose were due to termination. // Log the status code. + let code = if let Ok(Some(code)) = me.child.try_wait() { + format!("{}", code) + } else { + "none".into() + }; gum::debug!( target: LOG_TARGET, - worker_pid = %me.child.id(), - status_code = ?me.child.try_status(), + worker_pid = %me.child_id, + status_code = ?code, "pvf worker ({}): {:?}", me.program.display(), err, diff --git a/node/core/pvf/tests/it/adder.rs b/node/core/pvf/tests/it/adder.rs index 69b6b7d21979..8eb57e4d9026 100644 --- a/node/core/pvf/tests/it/adder.rs +++ b/node/core/pvf/tests/it/adder.rs @@ -22,7 +22,7 @@ use polkadot_parachain::primitives::{ ValidationParams, }; -#[async_std::test] +#[tokio::test] async fn execute_good_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; @@ -50,7 +50,7 @@ async fn execute_good_block_on_parent() { assert_eq!(new_head.post_state, hash_state(512)); } -#[async_std::test] +#[tokio::test] async fn execute_good_chain_on_parent() { let mut number = 0; let mut parent_hash = [0; 32]; @@ -88,7 +88,7 @@ async fn execute_good_chain_on_parent() { } } -#[async_std::test] +#[tokio::test] async fn execute_bad_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; @@ -113,7 +113,7 @@ async fn execute_bad_block_on_parent() { .unwrap_err(); } -#[async_std::test] +#[tokio::test] async fn stress_spawn() { let host = std::sync::Arc::new(TestHost::new()); diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs index a6aaf5d369d4..cacbedb75c0a 100644 --- a/node/core/pvf/tests/it/main.rs +++ b/node/core/pvf/tests/it/main.rs @@ -14,13 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use async_std::sync::Mutex; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost, }; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; use std::time::Duration; +use tokio::sync::Mutex; mod adder; mod worker_common; @@ -47,7 +47,7 @@ impl TestHost { let mut config = Config::new(cache_dir.path().to_owned(), program_path); f(&mut config); let (host, task) = start(config, Metrics::default()); - let _ = async_std::task::spawn(task); + let _ = tokio::task::spawn(task); Self { _cache_dir: cache_dir, host: Mutex::new(host) } } @@ -77,7 +77,7 @@ impl TestHost { } } -#[async_std::test] +#[tokio::test] async fn terminates_on_timeout() { let host = TestHost::new(); @@ -99,7 +99,7 @@ async fn terminates_on_timeout() { } } -#[async_std::test] +#[tokio::test] async fn parallel_execution() { // Run some jobs that do not complete, thus timing out. let host = TestHost::new(); @@ -136,7 +136,7 @@ async fn parallel_execution() { ); } -#[async_std::test] +#[tokio::test] async fn execute_queue_doesnt_stall_if_workers_died() { let host = TestHost::new_with_config(|cfg| { cfg.execute_workers_max_num = 5; diff --git a/node/core/pvf/tests/it/worker_common.rs b/node/core/pvf/tests/it/worker_common.rs index 464b80a9fe58..7e00d005df19 100644 --- a/node/core/pvf/tests/it/worker_common.rs +++ b/node/core/pvf/tests/it/worker_common.rs @@ -18,7 +18,7 @@ use crate::PUPPET_EXE; use polkadot_node_core_pvf::testing::worker_common::{spawn_with_program_path, SpawnErr}; use std::time::Duration; -#[async_std::test] +#[tokio::test] async fn spawn_timeout() { let result = spawn_with_program_path("integration-test", PUPPET_EXE, &["sleep"], Duration::from_secs(2)) @@ -26,7 +26,7 @@ async fn spawn_timeout() { assert!(matches!(result, Err(SpawnErr::AcceptTimeout))); } -#[async_std::test] +#[tokio::test] async fn should_connect() { let _ = spawn_with_program_path( "integration-test", From 3abc804a96d84ffe8a053484bae535b57cb1f8f7 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 13 Dec 2022 06:11:10 -0500 Subject: [PATCH 02/18] Rework workers to use `select!` instead of a mutex The improvement in code readability is more important than the thread overhead. --- node/core/pvf/src/execute/worker.rs | 91 ++++++++++---------- node/core/pvf/src/host.rs | 2 +- node/core/pvf/src/prepare/worker.rs | 127 ++++++++++++++-------------- node/core/pvf/src/worker_common.rs | 65 +++++--------- node/core/pvf/tests/it/main.rs | 12 ++- 5 files changed, 139 insertions(+), 158 deletions(-) diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index eb9f5dcb2a06..c0a64af55e47 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -31,11 +31,13 @@ use parity_scale_codec::{Decode, Encode}; use polkadot_parachain::primitives::ValidationResult; use std::{ path::{Path, PathBuf}, - sync::Arc, - thread, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, time::Duration, }; -use tokio::{io, net::UnixStream, runtime::Runtime, sync::Mutex}; +use tokio::{io, net::UnixStream, select}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -228,19 +230,13 @@ impl Response { /// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies /// the path to the socket used to communicate with the host. pub fn worker_entrypoint(socket_path: &str) { - worker_event_loop("execute", socket_path, |stream| async move { - let executor = Executor::new().map_err(|e| { + worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move { + let executor = Arc::new(Executor::new().map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) - })?; - let mutex = Arc::new(Mutex::new((stream, false))); + })?); loop { - let (artifact_path, params, execution_timeout) = { - let mut lock = mutex.lock().await; - // Unset the lock flag. We set it when either thread finishes. - lock.1 = false; - recv_request(&mut lock.0).await? - }; + let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), @@ -248,48 +244,53 @@ pub fn worker_entrypoint(socket_path: &str) { artifact_path.display(), ); + // Flag used only to signal to the cpu time monitor thread that it can finish. + let finished_flag = Arc::new(AtomicBool::new(false)); let cpu_time_start = ProcessTime::now(); - // Spawn a new thread that runs the CPU time monitor. Continuously wakes up from - // sleeping and then either sleeps for the remaining CPU time, or kills the process if - // we exceed the CPU timeout. - let (mutex_2, cpu_time_start_2, execution_timeout_2) = - (mutex.clone(), cpu_time_start, execution_timeout); - let handle = - thread::Builder::new().name("CPU time monitor".into()).spawn(move || { - let rt = Runtime::new().unwrap(); - rt.block_on(async { - cpu_time_monitor_loop( - JobKind::Execute, - mutex_2, - cpu_time_start_2, - execution_timeout_2, - ) - .await; - }) - })?; - - let response = - validate_using_artifact(&artifact_path, ¶ms, &executor, cpu_time_start).await; + // Spawn a new thread that runs the CPU time monitor. + let finished_flag_2 = finished_flag.clone(); + let thread_fut = rt_handle + .spawn_blocking(move || { + cpu_time_monitor_loop( + JobKind::Execute, + cpu_time_start, + execution_timeout, + finished_flag_2, + ); + }) + .fuse(); + let executor_2 = executor.clone(); + let execute_fut = rt_handle + .spawn_blocking(move || { + validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) + }) + .fuse(); - let mut lock = mutex.lock().await; - if lock.1 { - // Monitor thread detected timeout and the process should be terminated soon, - // nothing to do. - let _ = handle.join(); - continue - } - lock.1 = true; + let response = select! { + // If this future is not selected, the join handle is dropped and the thread will + // finish in the background. + join_res = thread_fut => { + match join_res { + Ok(()) => Response::TimedOut, + Err(e) => Response::InternalError(format!("{}", e)), + } + }, + execute_res = execute_fut => { + finished_flag.store(true, Ordering::Relaxed); + execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e))) + }, + }; - send_response(&mut lock.0, response).await?; + send_response(&mut stream, response).await?; } }); } -async fn validate_using_artifact( +fn validate_using_artifact( artifact_path: &Path, params: &[u8], - executor: &Executor, + executor: Arc, cpu_time_start: ProcessTime, ) -> Response { let descriptor_bytes = match unsafe { diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 9513e8dc2008..7a032320e211 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -1007,7 +1007,7 @@ mod tests { // Dropping the handle will lead to conclusion of the read part and thus will make the event // loop to stop, which in turn will resolve the join handle. drop(test.to_host_tx); - join_handle.await; + join_handle.await.unwrap(); } #[tokio::test] diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 4714bd97bbb8..b8744260dc6b 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -25,16 +25,19 @@ use crate::{ LOG_TARGET, }; use cpu_time::ProcessTime; +use futures::FutureExt; use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; use std::{ panic, path::{Path, PathBuf}, - sync::Arc, - thread, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, time::Duration, }; -use tokio::{io, net::UnixStream, runtime::Runtime, sync::Mutex}; +use tokio::{io, net::UnixStream, select}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -307,85 +310,79 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies /// the path to the socket used to communicate with the host. pub fn worker_entrypoint(socket_path: &str) { - worker_event_loop("prepare", socket_path, |stream| async move { - let mutex = Arc::new(Mutex::new((stream, false))); + worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { loop { - let (code, dest, preparation_timeout) = { - let mut lock = mutex.lock().await; - // Unset the lock flag. We set it when either thread finishes. - lock.1 = false; - recv_request(&mut lock.0).await? - }; + let (code, dest, preparation_timeout) = recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), "worker: preparing artifact", ); + // Flag used only to signal to the cpu time monitor thread that it can finish. + let finished_flag = Arc::new(AtomicBool::new(false)); let cpu_time_start = ProcessTime::now(); - // Spawn a new thread that runs the CPU time monitor. Continuously wakes up from - // sleeping and then either sleeps for the remaining CPU time, or kills the process if - // we exceed the CPU timeout. - let (mutex_2, cpu_time_start_2, preparation_timeout_2) = - (mutex.clone(), cpu_time_start, preparation_timeout); - let handle = - thread::Builder::new().name("CPU time monitor".into()).spawn(move || { - let rt = Runtime::new().unwrap(); - rt.block_on(async { - cpu_time_monitor_loop( - JobKind::Prepare, - mutex_2, - cpu_time_start_2, - preparation_timeout_2, - ) - .await; - }) - })?; - - // Prepares the artifact in a separate thread. - let compilation_result = prepare_artifact(&code).await; - let cpu_time_elapsed = cpu_time_start.elapsed(); - - let mut lock = mutex.lock().await; - if lock.1 { - // Monitor thread detected timeout and the process should be terminated soon, - // nothing to do. - let _ = handle.join(); - continue - } - lock.1 = true; - - let result = match compilation_result { - Err(err) => { - // Serialized error will be written into the socket. - Err(err) + // Spawn a new thread that runs the CPU time monitor. + let finished_flag_2 = finished_flag.clone(); + let thread_fut = rt_handle + .spawn_blocking(move || { + cpu_time_monitor_loop( + JobKind::Prepare, + cpu_time_start, + preparation_timeout, + finished_flag_2, + ) + }) + .fuse(); + let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse(); + + let result = select! { + // If this future is not selected, the join handle is dropped and the thread will + // finish in the background. + join_res = thread_fut => { + match join_res { + Ok(()) => Err(PrepareError::TimedOut), + Err(_) => Err(PrepareError::DidNotMakeIt), + } }, - Ok(compiled_artifact) => { - // Write the serialized artifact into a temp file. - // - // PVF host only keeps artifacts statuses in its memory, successfully compiled code gets stored - // on the disk (and consequently deserialized by execute-workers). The prepare worker is only - // required to send `Ok` to the pool to indicate the success. - - gum::debug!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: writing artifact to {}", - dest.display(), - ); - tokio::fs::write(&dest, &compiled_artifact).await?; - - Ok(cpu_time_elapsed) + compilation_res = prepare_fut => { + let cpu_time_elapsed = cpu_time_start.elapsed(); + finished_flag.store(true, Ordering::Relaxed); + + match compilation_res.unwrap_or_else(|_| Err(PrepareError::DidNotMakeIt)) { + Err(err) => { + // Serialized error will be written into the socket. + Err(err) + }, + Ok(compiled_artifact) => { + // Write the serialized artifact into a temp file. + // + // PVF host only keeps artifacts statuses in its memory, successfully + // compiled code gets stored on the disk (and consequently deserialized + // by execute-workers). The prepare worker is only required to send `Ok` + // to the pool to indicate the success. + + gum::debug!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: writing artifact to {}", + dest.display(), + ); + tokio::fs::write(&dest, &compiled_artifact).await?; + + Ok(cpu_time_elapsed) + }, + } }, }; - framed_send(&mut lock.0, result.encode().as_slice()).await?; + framed_send(&mut stream, result.encode().as_slice()).await?; } }); } -async fn prepare_artifact(code: &[u8]) -> Result { +fn prepare_artifact(code: &[u8]) -> Result { panic::catch_unwind(|| { let blob = match crate::executor_intf::prevalidate(code) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index f0dac2bed2fb..1b38a7c35484 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -16,18 +16,20 @@ //! Common logic for implementation of worker processes. -use crate::{execute::ExecuteResponse, PrepareError, LOG_TARGET}; +use crate::LOG_TARGET; use cpu_time::ProcessTime; use futures::{never::Never, FutureExt as _}; use futures_timer::Delay; -use parity_scale_codec::Encode; use pin_project::pin_project; use rand::Rng; use std::{ fmt, mem, path::{Path, PathBuf}, pin::Pin, - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, task::{Context, Poll}, time::Duration, }; @@ -35,8 +37,7 @@ use tokio::{ io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf}, net::{UnixListener, UnixStream}, process, - runtime::Runtime, - sync::Mutex, + runtime::{Handle, Runtime}, }; /// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in @@ -176,16 +177,17 @@ pub async fn tmpfile(prefix: &str) -> io::Result { pub fn worker_event_loop(debug_id: &'static str, socket_path: &str, mut event_loop: F) where - F: FnMut(UnixStream) -> Fut, + F: FnMut(Handle, UnixStream) -> Fut, Fut: futures::Future>, { - let rt = Runtime::new().unwrap(); + let rt = Runtime::new().expect("Creates tokio runtime"); + let handle = rt.handle(); let err = rt .block_on(async move { let stream = UnixStream::connect(socket_path).await?; let _ = tokio::fs::remove_file(socket_path).await; - event_loop(stream).await + event_loop(handle.clone(), stream).await }) .unwrap_err(); // it's never `Ok` because it's `Ok(Never)` @@ -199,36 +201,30 @@ where } /// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up -/// from sleeping and then either sleeps for the remaining CPU time, or sends back a timeout error -/// if we exceed the CPU timeout. -/// -/// NOTE: If the job completes and this thread is still sleeping, it will continue sleeping in the -/// background. When it wakes, it will see that the flag has been set and return. -pub async fn cpu_time_monitor_loop( +/// from sleeping and then either sleeps for the remaining CPU time, or returns if we exceed the CPU +/// timeout. +pub fn cpu_time_monitor_loop( job_kind: JobKind, - mutex: Arc>, cpu_time_start: ProcessTime, timeout: Duration, + finished_flag: Arc, ) { loop { + if finished_flag.load(Ordering::Relaxed) { + return + } + let cpu_time_elapsed = cpu_time_start.elapsed(); // Treat the timeout as CPU time, which is less subject to variance due to load. if cpu_time_elapsed <= timeout { // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep // is wall clock time. The CPU clock may be slower than the wall clock. - let sleep_interval = timeout - cpu_time_elapsed + JOB_TIMEOUT_OVERHEAD; + let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD; std::thread::sleep(sleep_interval); continue } - let mut lock = mutex.lock().await; - if lock.1 { - // Hit the job-completed case first, return from this thread. - return - } - lock.1 = true; - // Log if we exceed the timeout. gum::warn!( target: LOG_TARGET, @@ -238,32 +234,11 @@ pub async fn cpu_time_monitor_loop( timeout.as_millis(), ); - // Send back a `TimedOut` error. + // Return, signaling that we should send a `TimedOut` error to the host. // // NOTE: This will cause the worker, whether preparation or execution, to be killed by the // host. We do not kill the process here because it would interfere with the proper handling // of this error. - let encoded_result = match job_kind { - JobKind::Prepare => { - let result: Result<(), PrepareError> = Err(PrepareError::TimedOut); - result.encode() - }, - JobKind::Execute => { - let result = ExecuteResponse::TimedOut; - result.encode() - }, - }; - // If we error here there is nothing we can do apart from log it. The receiving side will - // just have to time out. - if let Err(err) = framed_send(&mut lock.0, encoded_result.as_slice()).await { - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "{job_kind} worker -> pvf host: error sending result over the socket: {:?}", - err - ); - } - return } } diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs index cacbedb75c0a..68916be42890 100644 --- a/node/core/pvf/tests/it/main.rs +++ b/node/core/pvf/tests/it/main.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use assert_matches::assert_matches; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost, @@ -100,7 +101,7 @@ async fn terminates_on_timeout() { } #[tokio::test] -async fn parallel_execution() { +async fn ensure_parallel_execution() { // Run some jobs that do not complete, thus timing out. let host = TestHost::new(); let execute_pvf_future_1 = host.validate_candidate( @@ -123,7 +124,14 @@ async fn parallel_execution() { ); let start = std::time::Instant::now(); - let (_, _) = futures::join!(execute_pvf_future_1, execute_pvf_future_2); + let (res1, res2) = futures::join!(execute_pvf_future_1, execute_pvf_future_2); + assert_matches!( + (res1, res2), + ( + Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)), + Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) + ) + ); // Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel). let duration = std::time::Instant::now().duration_since(start); From 5fcc67d4ee3251186c3f9dd61a3ee29ba0f4bfec Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 13 Dec 2022 06:22:12 -0500 Subject: [PATCH 03/18] Remove unnecessary `fuse` --- node/core/pvf/src/execute/worker.rs | 26 +++++++++++--------------- node/core/pvf/src/prepare/worker.rs | 21 +++++++++------------ 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index c0a64af55e47..305d7af73dcc 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -250,22 +250,18 @@ pub fn worker_entrypoint(socket_path: &str) { // Spawn a new thread that runs the CPU time monitor. let finished_flag_2 = finished_flag.clone(); - let thread_fut = rt_handle - .spawn_blocking(move || { - cpu_time_monitor_loop( - JobKind::Execute, - cpu_time_start, - execution_timeout, - finished_flag_2, - ); - }) - .fuse(); + let thread_fut = rt_handle.spawn_blocking(move || { + cpu_time_monitor_loop( + JobKind::Execute, + cpu_time_start, + execution_timeout, + finished_flag_2, + ); + }); let executor_2 = executor.clone(); - let execute_fut = rt_handle - .spawn_blocking(move || { - validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) - }) - .fuse(); + let execute_fut = rt_handle.spawn_blocking(move || { + validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) + }); let response = select! { // If this future is not selected, the join handle is dropped and the thread will diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index b8744260dc6b..442ce8c8a44c 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -25,7 +25,6 @@ use crate::{ LOG_TARGET, }; use cpu_time::ProcessTime; -use futures::FutureExt; use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; use std::{ @@ -325,17 +324,15 @@ pub fn worker_entrypoint(socket_path: &str) { // Spawn a new thread that runs the CPU time monitor. let finished_flag_2 = finished_flag.clone(); - let thread_fut = rt_handle - .spawn_blocking(move || { - cpu_time_monitor_loop( - JobKind::Prepare, - cpu_time_start, - preparation_timeout, - finished_flag_2, - ) - }) - .fuse(); - let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse(); + let thread_fut = rt_handle.spawn_blocking(move || { + cpu_time_monitor_loop( + JobKind::Prepare, + cpu_time_start, + preparation_timeout, + finished_flag_2, + ) + }); + let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)); let result = select! { // If this future is not selected, the join handle is dropped and the thread will From 451fae013a638e0508b014777f337743de9a2164 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 13 Dec 2022 07:58:05 -0500 Subject: [PATCH 04/18] Add explanation for `expect()` --- node/core/pvf/src/worker_common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 1b38a7c35484..58830d119a80 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -180,7 +180,7 @@ where F: FnMut(Handle, UnixStream) -> Fut, Fut: futures::Future>, { - let rt = Runtime::new().expect("Creates tokio runtime"); + let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); let handle = rt.handle(); let err = rt .block_on(async move { From fc4c28b7676395f4d6ca0a99e2c0a765c4bb685f Mon Sep 17 00:00:00 2001 From: Marcin S Date: Sun, 18 Dec 2022 08:22:29 -0500 Subject: [PATCH 05/18] Update node/core/pvf/src/worker_common.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- node/core/pvf/src/worker_common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 58830d119a80..6c475825188f 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -366,7 +366,7 @@ impl futures::Future for WorkerHandle { gum::debug!( target: LOG_TARGET, worker_pid = %me.child_id, - status_code = ?code, + status_code = ?me.child.try_wait().ok().flatten().map(|c| c.to_string()), "pvf worker ({}): {:?}", me.program.display(), err, From 1dde78b70a32272b1042e66554b561f44dd67e46 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Sun, 18 Dec 2022 08:22:42 -0500 Subject: [PATCH 06/18] Update node/core/pvf/src/worker_common.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- node/core/pvf/src/worker_common.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 6c475825188f..e88ee6b992b8 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -358,11 +358,6 @@ impl futures::Future for WorkerHandle { // leaves us with legit errors which we suppose were due to termination. // Log the status code. - let code = if let Ok(Some(code)) = me.child.try_wait() { - format!("{}", code) - } else { - "none".into() - }; gum::debug!( target: LOG_TARGET, worker_pid = %me.child_id, From da31a4886706f952e883bfde5efd5e267a7be87a Mon Sep 17 00:00:00 2001 From: Marcin S Date: Sun, 18 Dec 2022 09:15:54 -0500 Subject: [PATCH 07/18] Address some review comments --- node/core/pvf/src/execute/worker.rs | 35 +++++++++++++++++------------ node/core/pvf/src/lib.rs | 1 + node/core/pvf/src/prepare/worker.rs | 30 +++++++++++++++---------- node/core/pvf/src/worker_common.rs | 18 ++++++--------- node/core/pvf/tests/it/main.rs | 6 +++++ 5 files changed, 53 insertions(+), 37 deletions(-) diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index 305d7af73dcc..011e70ee1651 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -25,7 +25,7 @@ use crate::{ LOG_TARGET, }; use cpu_time::ProcessTime; -use futures::FutureExt; +use futures::{pin_mut, select_biased, FutureExt}; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; use polkadot_parachain::primitives::ValidationResult; @@ -37,7 +37,7 @@ use std::{ }, time::Duration, }; -use tokio::{io, net::UnixStream, select}; +use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -250,20 +250,27 @@ pub fn worker_entrypoint(socket_path: &str) { // Spawn a new thread that runs the CPU time monitor. let finished_flag_2 = finished_flag.clone(); - let thread_fut = rt_handle.spawn_blocking(move || { - cpu_time_monitor_loop( - JobKind::Execute, - cpu_time_start, - execution_timeout, - finished_flag_2, - ); - }); + let thread_fut = rt_handle + .spawn_blocking(move || { + cpu_time_monitor_loop( + JobKind::Execute, + cpu_time_start, + execution_timeout, + finished_flag_2, + ); + }) + .fuse(); let executor_2 = executor.clone(); - let execute_fut = rt_handle.spawn_blocking(move || { - validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) - }); + let execute_fut = rt_handle + .spawn_blocking(move || { + validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) + }) + .fuse(); + + pin_mut!(thread_fut); + pin_mut!(execute_fut); - let response = select! { + let response = select_biased! { // If this future is not selected, the join handle is dropped and the thread will // finish in the background. join_res = thread_fut => { diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs index 1aabb1100437..0e858147bd29 100644 --- a/node/core/pvf/src/lib.rs +++ b/node/core/pvf/src/lib.rs @@ -113,6 +113,7 @@ pub use pvf::Pvf; pub use host::{start, Config, ValidationHost}; pub use metrics::Metrics; +pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR; pub use execute::worker_entrypoint as execute_worker_entrypoint; pub use prepare::worker_entrypoint as prepare_worker_entrypoint; diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 442ce8c8a44c..2d1a61068394 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -25,6 +25,7 @@ use crate::{ LOG_TARGET, }; use cpu_time::ProcessTime; +use futures::{pin_mut, select_biased, FutureExt}; use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; use std::{ @@ -36,7 +37,7 @@ use std::{ }, time::Duration, }; -use tokio::{io, net::UnixStream, select}; +use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -324,17 +325,22 @@ pub fn worker_entrypoint(socket_path: &str) { // Spawn a new thread that runs the CPU time monitor. let finished_flag_2 = finished_flag.clone(); - let thread_fut = rt_handle.spawn_blocking(move || { - cpu_time_monitor_loop( - JobKind::Prepare, - cpu_time_start, - preparation_timeout, - finished_flag_2, - ) - }); - let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)); - - let result = select! { + let thread_fut = rt_handle + .spawn_blocking(move || { + cpu_time_monitor_loop( + JobKind::Prepare, + cpu_time_start, + preparation_timeout, + finished_flag_2, + ) + }) + .fuse(); + let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse(); + + pin_mut!(thread_fut); + pin_mut!(prepare_fut); + + let result = select_biased! { // If this future is not selected, the join handle is dropped and the thread will // finish in the background. join_res = thread_fut => { diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 58830d119a80..34e1e49eb189 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -203,6 +203,12 @@ where /// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up /// from sleeping and then either sleeps for the remaining CPU time, or returns if we exceed the CPU /// timeout. +/// +/// Returning indicates that we should send a `TimedOut` error to the host. +/// +/// NOTE: Returning will cause the worker, whether preparation or execution, to be killed by the +/// host. We do not kill the process here because it would interfere with the proper handling of +/// this error. pub fn cpu_time_monitor_loop( job_kind: JobKind, cpu_time_start: ProcessTime, @@ -234,11 +240,6 @@ pub fn cpu_time_monitor_loop( timeout.as_millis(), ); - // Return, signaling that we should send a `TimedOut` error to the host. - // - // NOTE: This will cause the worker, whether preparation or execution, to be killed by the - // host. We do not kill the process here because it would interfere with the proper handling - // of this error. return } } @@ -358,15 +359,10 @@ impl futures::Future for WorkerHandle { // leaves us with legit errors which we suppose were due to termination. // Log the status code. - let code = if let Ok(Some(code)) = me.child.try_wait() { - format!("{}", code) - } else { - "none".into() - }; gum::debug!( target: LOG_TARGET, worker_pid = %me.child_id, - status_code = ?code, + status_code = ?me.child.try_wait().ok().flatten().map(|c| c.to_string()), "pvf worker ({}): {:?}", me.program.display(), err, diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs index 68916be42890..07754ef8693d 100644 --- a/node/core/pvf/tests/it/main.rs +++ b/node/core/pvf/tests/it/main.rs @@ -18,6 +18,7 @@ use assert_matches::assert_matches; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost, + JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; use std::time::Duration; @@ -82,6 +83,7 @@ impl TestHost { async fn terminates_on_timeout() { let host = TestHost::new(); + let start = std::time::Instant::now(); let result = host .validate_candidate( halt::wasm_binary_unwrap(), @@ -98,6 +100,10 @@ async fn terminates_on_timeout() { Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) => {}, r => panic!("{:?}", r), } + + let duration = std::time::Instant::now().duration_since(start); + assert!(duration >= TEST_EXECUTION_TIMEOUT); + assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR); } #[tokio::test] From e1c2cf361168db4b47511691815d79e90233a4a1 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Sun, 18 Dec 2022 17:38:44 -0500 Subject: [PATCH 08/18] Shutdown tokio runtime --- node/core/pvf/src/worker_common.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 34e1e49eb189..e608df326433 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -189,7 +189,8 @@ where event_loop(handle.clone(), stream).await }) - .unwrap_err(); // it's never `Ok` because it's `Ok(Never)` + // It's never `Ok` because it's `Ok(Never)`. + .unwrap_err(); gum::debug!( target: LOG_TARGET, @@ -198,6 +199,11 @@ where debug_id, err, ); + + // We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast + // as possible and not wait for stalled validation to finish. This isn't strictly necessary now, + // but may be in the future. + rt.shutdown_background(); } /// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up From 077a1230ec308e04fc456da4c44750ee923f10ad Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 19 Dec 2022 06:22:15 -0500 Subject: [PATCH 09/18] Run cargo fmt --- node/core/pvf/src/worker_common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index e608df326433..9dcf2270c0b8 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -189,7 +189,7 @@ where event_loop(handle.clone(), stream).await }) - // It's never `Ok` because it's `Ok(Never)`. + // It's never `Ok` because it's `Ok(Never)`. .unwrap_err(); gum::debug!( From e0d4b9e6756be1079bf3567f9633b2b93bbc5e67 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 19 Dec 2022 11:40:21 -0500 Subject: [PATCH 10/18] Add a small note about retries --- node/core/candidate-validation/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 74610bc113ec..dd9ab2ccd3f2 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -624,7 +624,8 @@ trait ValidationBackend { self.validate_candidate(pvf.clone(), timeout, params.encode()).await; // If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the - // assumption that the conditions that caused this error may have been transient. + // assumption that the conditions that caused this error may have been transient. Note that + // this error is only a result of execution itself and not of preparation. if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) = validation_result { From 28d40628e9902c1ab113639ba69f0b35e2fb2f46 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 20 Dec 2022 13:50:49 -0500 Subject: [PATCH 11/18] Fix up merge --- node/core/pvf/src/prepare/pool.rs | 1 - node/core/pvf/src/prepare/worker.rs | 27 +++++++++++---------------- node/core/pvf/src/worker_common.rs | 17 ++++------------- 3 files changed, 15 insertions(+), 30 deletions(-) diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 47a62764789e..26532fc98be0 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -22,7 +22,6 @@ use crate::{ LOG_TARGET, }; use always_assert::never; -use async_std::path::{Path, PathBuf}; use futures::{ channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt, }; diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index bb4a1bb6cb2a..115c9c12331a 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -82,7 +82,7 @@ pub async fn start_work( artifact_path: PathBuf, preparation_timeout: Duration, ) -> Outcome { - let IdleWorker { mut stream, pid } = worker; + let IdleWorker { stream, pid } = worker; gum::debug!( target: LOG_TARGET, @@ -91,7 +91,7 @@ pub async fn start_work( artifact_path.display(), ); - with_tmp_file(stream.clone(), pid, cache_path, |tmp_file| async move { + with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await { gum::warn!( target: LOG_TARGET, @@ -194,11 +194,6 @@ async fn handle_response_bytes( preparation_timeout.as_millis(), tmp_file.display(), ); - - // Return a timeout error. - // - // NOTE: The artifact exists, but is located in a temporary file which - // will be cleared by `with_tmp_file`. return Outcome::TimedOut } @@ -210,10 +205,9 @@ async fn handle_response_bytes( artifact_path.display(), ); - tokio::fs::rename(&tmp_file, &artifact_path) - .await - .map(|_| Outcome::Concluded { worker, result }) - .unwrap_or_else(|err| { + match tokio::fs::rename(&tmp_file, &artifact_path).await { + Ok(()) => Outcome::Concluded { worker, result }, + Err(err) => { gum::warn!( target: LOG_TARGET, worker_pid = %pid, @@ -223,7 +217,8 @@ async fn handle_response_bytes( err, ); Outcome::RenameTmpFileErr { worker, result, err: format!("{:?}", err) } - }) + }, + } } /// Create a temporary file for an artifact at the given cache path and execute the given @@ -233,7 +228,7 @@ async fn handle_response_bytes( async fn with_tmp_file(stream: UnixStream, pid: u32, cache_path: &Path, f: F) -> Outcome where Fut: futures::Future, - F: FnOnce(PathBuf) -> Fut, + F: FnOnce(PathBuf, UnixStream) -> Fut, { let tmp_file = match tmpfile_in("prepare-artifact-", cache_path).await { Ok(f) => f, @@ -251,7 +246,7 @@ where }, }; - let outcome = f(tmp_file.clone()).await; + let outcome = f(tmp_file.clone(), stream).await; // The function called above is expected to move `tmp_file` to a new location upon success. However, // the function may as well fail and in that case we should remove the tmp file here. @@ -344,14 +339,14 @@ pub fn worker_entrypoint(socket_path: &str) { join_res = thread_fut => { match join_res { Ok(()) => Err(PrepareError::TimedOut), - Err(_) => Err(PrepareError::DidNotMakeIt), + Err(_) => Err(PrepareError::IoErr), } }, compilation_res = prepare_fut => { let cpu_time_elapsed = cpu_time_start.elapsed(); finished_flag.store(true, Ordering::Relaxed); - match compilation_res.unwrap_or_else(|_| Err(PrepareError::DidNotMakeIt)) { + match compilation_res.unwrap_or_else(|_| Err(PrepareError::IoErr)) { Err(err) => { // Serialized error will be written into the socket. Err(err) diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 42ad5e113a3a..36d2bc1eccf2 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -16,7 +16,7 @@ //! Common logic for implementation of worker processes. -use crate::{execute::ExecuteResponse, PrepareError, LOG_TARGET}; +use crate::LOG_TARGET; use cpu_time::ProcessTime; use futures::{never::Never, FutureExt as _}; use futures_timer::Delay; @@ -187,21 +187,12 @@ where let stream = UnixStream::connect(socket_path).await?; let _ = tokio::fs::remove_file(socket_path).await; - let result = event_loop(handle.clone(), stream.clone()).await; - - if let Err(err) = stream.shutdown(Shutdown::Both) { - // Log, but don't return error here, as it may shadow any error from `event_loop`. - gum::debug!( - target: LOG_TARGET, - "error shutting down stream at path {}: {}", - socket_path, - err - ); - } + let result = event_loop(handle.clone(), stream).await; result }) - .unwrap_err(); // it's never `Ok` because it's `Ok(Never)` + // It's never `Ok` because it's `Ok(Never)`. + .unwrap_err(); gum::debug!( target: LOG_TARGET, From 3964acad6007b2c651c7f8ec56cfbbf149eef160 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 20 Dec 2022 14:31:42 -0500 Subject: [PATCH 12/18] Rework `cpu_time_monitor_loop` to return when other thread finishes --- node/core/pvf/src/execute/worker.rs | 14 +++++--------- node/core/pvf/src/prepare/worker.rs | 14 +++++--------- node/core/pvf/src/worker_common.rs | 19 ++++++++----------- 3 files changed, 18 insertions(+), 29 deletions(-) diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index 011e70ee1651..e8ecfd4f9539 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -31,10 +31,7 @@ use parity_scale_codec::{Decode, Encode}; use polkadot_parachain::primitives::ValidationResult; use std::{ path::{Path, PathBuf}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::{mpsc::channel, Arc}, time::Duration, }; use tokio::{io, net::UnixStream}; @@ -244,19 +241,18 @@ pub fn worker_entrypoint(socket_path: &str) { artifact_path.display(), ); - // Flag used only to signal to the cpu time monitor thread that it can finish. - let finished_flag = Arc::new(AtomicBool::new(false)); + // Used to signal to the cpu time monitor thread that it can finish. + let (finished_tx, finished_rx) = channel::<()>(); let cpu_time_start = ProcessTime::now(); // Spawn a new thread that runs the CPU time monitor. - let finished_flag_2 = finished_flag.clone(); let thread_fut = rt_handle .spawn_blocking(move || { cpu_time_monitor_loop( JobKind::Execute, cpu_time_start, execution_timeout, - finished_flag_2, + finished_rx, ); }) .fuse(); @@ -280,7 +276,7 @@ pub fn worker_entrypoint(socket_path: &str) { } }, execute_res = execute_fut => { - finished_flag.store(true, Ordering::Relaxed); + finished_tx.send(()).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e))) }, }; diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 115c9c12331a..cddd6b3f1d1a 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -31,10 +31,7 @@ use sp_core::hexdisplay::HexDisplay; use std::{ panic, path::{Path, PathBuf}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::{mpsc::channel, Arc}, time::Duration, }; use tokio::{io, net::UnixStream}; @@ -312,19 +309,18 @@ pub fn worker_entrypoint(socket_path: &str) { "worker: preparing artifact", ); - // Flag used only to signal to the cpu time monitor thread that it can finish. - let finished_flag = Arc::new(AtomicBool::new(false)); + // Used to signal to the cpu time monitor thread that it can finish. + let (finished_tx, finished_rx) = channel::<()>(); let cpu_time_start = ProcessTime::now(); // Spawn a new thread that runs the CPU time monitor. - let finished_flag_2 = finished_flag.clone(); let thread_fut = rt_handle .spawn_blocking(move || { cpu_time_monitor_loop( JobKind::Prepare, cpu_time_start, preparation_timeout, - finished_flag_2, + finished_rx, ) }) .fuse(); @@ -344,7 +340,7 @@ pub fn worker_entrypoint(socket_path: &str) { }, compilation_res = prepare_fut => { let cpu_time_elapsed = cpu_time_start.elapsed(); - finished_flag.store(true, Ordering::Relaxed); + finished_tx.send(()).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; match compilation_res.unwrap_or_else(|_| Err(PrepareError::IoErr)) { Err(err) => { diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 36d2bc1eccf2..70258650f3e1 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -26,10 +26,7 @@ use std::{ fmt, mem, path::{Path, PathBuf}, pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::mpsc::Receiver, task::{Context, Poll}, time::Duration, }; @@ -221,13 +218,9 @@ pub fn cpu_time_monitor_loop( job_kind: JobKind, cpu_time_start: ProcessTime, timeout: Duration, - finished_flag: Arc, + finished_rx: Receiver<()>, ) { loop { - if finished_flag.load(Ordering::Relaxed) { - return - } - let cpu_time_elapsed = cpu_time_start.elapsed(); // Treat the timeout as CPU time, which is less subject to variance due to load. @@ -235,8 +228,12 @@ pub fn cpu_time_monitor_loop( // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep // is wall clock time. The CPU clock may be slower than the wall clock. let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD; - std::thread::sleep(sleep_interval); - continue + match finished_rx.recv_timeout(sleep_interval) { + // Received finish signal. + Ok(()) => return, + // Timed out, restart loop. + Err(_) => continue, + } } // Log if we exceed the timeout. From 705751850af9cc38622211c36edbedf4997d031b Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 20 Dec 2022 14:47:58 -0500 Subject: [PATCH 13/18] Add error string to PrepareError::IoErr variant --- node/core/candidate-validation/src/lib.rs | 6 +++--- node/core/pvf/src/error.rs | 6 +++--- node/core/pvf/src/prepare/pool.rs | 4 ++-- node/core/pvf/src/prepare/worker.rs | 12 ++++++------ 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 2eaca3fd7e7e..932d100d4317 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -667,12 +667,12 @@ impl ValidationBackend for ValidationHost { async fn precheck_pvf(&mut self, pvf: Pvf) -> Result { let (tx, rx) = oneshot::channel(); - if let Err(_) = self.precheck_pvf(pvf, tx).await { + if let Err(err) = self.precheck_pvf(pvf, tx).await { // Return an IO error if there was an error communicating with the host. - return Err(PrepareError::IoErr) + return Err(PrepareError::IoErr(err)) } - let precheck_result = rx.await.or(Err(PrepareError::IoErr))?; + let precheck_result = rx.await.map_err(|err| PrepareError::IoErr(err.to_string()))?; precheck_result } diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index 01d8c78d39ca..a679b2f96062 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -34,7 +34,7 @@ pub enum PrepareError { TimedOut, /// An IO error occurred while receiving the result from the worker process. This state is reported by the /// validation host (not by the worker). - IoErr, + IoErr(String), /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the /// validation host (not by the worker). CreateTmpFileErr(String), @@ -54,7 +54,7 @@ impl PrepareError { use PrepareError::*; match self { Prevalidation(_) | Preparation(_) | Panic(_) => true, - TimedOut | IoErr | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, + TimedOut | IoErr(_) | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, } } } @@ -67,7 +67,7 @@ impl fmt::Display for PrepareError { Preparation(err) => write!(f, "preparation: {}", err), Panic(err) => write!(f, "panic: {}", err), TimedOut => write!(f, "prepare: timeout"), - IoErr => write!(f, "prepare: io error while receiving response"), + IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err), CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err), RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err), } diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 26532fc98be0..0d39623c99db 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -327,14 +327,14 @@ fn handle_mux( Ok(()) }, - Outcome::IoErr => { + Outcome::IoErr(err) => { if attempt_retire(metrics, spawned, worker) { reply( from_pool, FromPool::Concluded { worker, rip: true, - result: Err(PrepareError::IoErr), + result: Err(PrepareError::IoErr(err)), }, )?; } diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index cddd6b3f1d1a..c997bf6ff479 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -64,7 +64,7 @@ pub enum Outcome { /// An IO error occurred while receiving the result from the worker process. /// /// This doesn't return an idle worker instance, thus this worker is no longer usable. - IoErr, + IoErr(String), } /// Given the idle token of a worker and parameters of work, communicates with the worker and @@ -131,7 +131,7 @@ pub async fn start_work( "failed to recv a prepare response: {:?}", err, ); - Outcome::IoErr + Outcome::IoErr(err.to_string()) }, Err(_) => { // Timed out here on the host. @@ -162,7 +162,7 @@ async fn handle_response_bytes( // By convention we expect encoded `PrepareResult`. let result = match PrepareResult::decode(&mut response_bytes.as_slice()) { Ok(result) => result, - Err(_) => { + Err(err) => { // We received invalid bytes from the worker. let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; gum::warn!( @@ -171,7 +171,7 @@ async fn handle_response_bytes( "received unexpected response from the prepare worker: {}", HexDisplay::from(&bound_bytes), ); - return Outcome::IoErr + return Outcome::IoErr(err.to_string()) }, }; let cpu_time_elapsed = match result { @@ -335,14 +335,14 @@ pub fn worker_entrypoint(socket_path: &str) { join_res = thread_fut => { match join_res { Ok(()) => Err(PrepareError::TimedOut), - Err(_) => Err(PrepareError::IoErr), + Err(err) => Err(PrepareError::IoErr(err.to_string())), } }, compilation_res = prepare_fut => { let cpu_time_elapsed = cpu_time_start.elapsed(); finished_tx.send(()).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; - match compilation_res.unwrap_or_else(|_| Err(PrepareError::IoErr)) { + match compilation_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) { Err(err) => { // Serialized error will be written into the socket. Err(err) From e6ba0986af4c2736b0004621162f19a2d3674cb2 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 20 Dec 2022 14:59:03 -0500 Subject: [PATCH 14/18] Log when artifacts fail to prepare --- node/core/pvf/src/host.rs | 17 +++++++++++++---- node/primitives/src/lib.rs | 2 +- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 33749b204c67..fbec10b6513a 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -703,10 +703,19 @@ async fn handle_prepare_done( *state = match result { Ok(cpu_time_elapsed) => ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed }, - Err(error) => ArtifactState::FailedToProcess { - last_time_failed: SystemTime::now(), - num_failures: *num_failures + 1, - error, + Err(error) => { + gum::debug!( + target: LOG_TARGET, + artifact_id = ?artifact_id, + num_failures = ?num_failures, + "Failed to process artifact: {}", + error + ); + ArtifactState::FailedToProcess { + last_time_failed: SystemTime::now(), + num_failures: *num_failures + 1, + error, + } }, }; diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index da0a0eca80be..9af65b3d601e 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -227,7 +227,7 @@ pub type UncheckedSignedFullStatement = UncheckedSigned Date: Tue, 20 Dec 2022 16:14:25 -0500 Subject: [PATCH 15/18] Fix `cpu_time_monitor_loop`; fix test --- node/core/pvf/src/prepare/queue.rs | 2 +- node/core/pvf/src/worker_common.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 5595380d55e0..c44301c7427b 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -761,7 +761,7 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, - result: Err(PrepareError::IoErr), + result: Err(PrepareError::IoErr("test".into())), }); test.poll_ensure_to_pool_is_empty().await; } diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 70258650f3e1..8db61f384b64 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -26,7 +26,7 @@ use std::{ fmt, mem, path::{Path, PathBuf}, pin::Pin, - sync::mpsc::Receiver, + sync::mpsc::{Receiver, RecvTimeoutError}, task::{Context, Poll}, time::Duration, }; @@ -232,7 +232,8 @@ pub fn cpu_time_monitor_loop( // Received finish signal. Ok(()) => return, // Timed out, restart loop. - Err(_) => continue, + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => return, } } From c09377a9bcb5fe4f2ff47c10e97adaa919df1a14 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 20 Dec 2022 16:56:11 -0500 Subject: [PATCH 16/18] Fix text --- node/core/candidate-validation/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs index c6003c734973..476e4ea7f985 100644 --- a/node/core/candidate-validation/src/tests.rs +++ b/node/core/candidate-validation/src/tests.rs @@ -1053,5 +1053,5 @@ fn precheck_properly_classifies_outcomes() { inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid); inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed); - inner(Err(PrepareError::IoErr), PreCheckOutcome::Failed); + inner(Err(PrepareError::IoErr("fizz".to_owned())), PreCheckOutcome::Failed); } From 05d1865adee9d0f6475f267e29d0ca45c305a6a8 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Thu, 22 Dec 2022 09:02:22 -0500 Subject: [PATCH 17/18] Fix a couple of potential minor data races. First data race was due to logging in the CPU monitor thread even if the job (other thread) finished. It can technically finish before or after the log. Maybe best would be to move this log to the `select!`s, where we are guaranteed to have chosen the timed-out branch, although there would be a bit of duplication. Also, it was possible for this thread to complete before we executed `finished_tx.send` in the other thread, which would trigger an error as the receiver has already been dropped. And right now, such a spurious error from `send` would be returned even if the job otherwise succeeded. --- node/core/pvf/src/execute/worker.rs | 24 +++++++++------ node/core/pvf/src/prepare/worker.rs | 26 ++++++++++------- node/core/pvf/src/worker_common.rs | 45 +++++++---------------------- 3 files changed, 41 insertions(+), 54 deletions(-) diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index e8ecfd4f9539..df928efaa642 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -19,7 +19,7 @@ use crate::{ executor_intf::Executor, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, - spawn_with_program_path, worker_event_loop, IdleWorker, JobKind, SpawnErr, WorkerHandle, + spawn_with_program_path, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, @@ -248,12 +248,7 @@ pub fn worker_entrypoint(socket_path: &str) { // Spawn a new thread that runs the CPU time monitor. let thread_fut = rt_handle .spawn_blocking(move || { - cpu_time_monitor_loop( - JobKind::Execute, - cpu_time_start, - execution_timeout, - finished_rx, - ); + cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx) }) .fuse(); let executor_2 = executor.clone(); @@ -271,12 +266,23 @@ pub fn worker_entrypoint(socket_path: &str) { // finish in the background. join_res = thread_fut => { match join_res { - Ok(()) => Response::TimedOut, + Ok(Some(cpu_time_elapsed)) => { + // Log if we exceed the timeout and the other thread hasn't finished. + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "execute job took {}ms cpu time, exceeded execute timeout {}ms", + cpu_time_elapsed.as_millis(), + execution_timeout.as_millis(), + ); + Response::TimedOut + }, + Ok(None) => Response::InternalError("error communicating over finished channel".into()), Err(e) => Response::InternalError(format!("{}", e)), } }, execute_res = execute_fut => { - finished_tx.send(()).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; + let _ = finished_tx.send(()); execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e))) }, }; diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index c997bf6ff479..d3550fe3afe6 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -19,8 +19,8 @@ use crate::{ error::{PrepareError, PrepareResult}, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, - spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, JobKind, SpawnErr, - WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, + spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, + JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; @@ -316,12 +316,7 @@ pub fn worker_entrypoint(socket_path: &str) { // Spawn a new thread that runs the CPU time monitor. let thread_fut = rt_handle .spawn_blocking(move || { - cpu_time_monitor_loop( - JobKind::Prepare, - cpu_time_start, - preparation_timeout, - finished_rx, - ) + cpu_time_monitor_loop(cpu_time_start, preparation_timeout, finished_rx) }) .fuse(); let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse(); @@ -334,13 +329,24 @@ pub fn worker_entrypoint(socket_path: &str) { // finish in the background. join_res = thread_fut => { match join_res { - Ok(()) => Err(PrepareError::TimedOut), + Ok(Some(cpu_time_elapsed)) => { + // Log if we exceed the timeout and the other thread hasn't finished. + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", + cpu_time_elapsed.as_millis(), + preparation_timeout.as_millis(), + ); + Err(PrepareError::TimedOut) + }, + Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())), Err(err) => Err(PrepareError::IoErr(err.to_string())), } }, compilation_res = prepare_fut => { let cpu_time_elapsed = cpu_time_start.elapsed(); - finished_tx.send(()).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; + let _ = finished_tx.send(()); match compilation_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) { Err(err) => { diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 8db61f384b64..9cda5f8cd0b7 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -45,21 +45,6 @@ pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4; /// child process. pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50); -#[derive(Copy, Clone, Debug)] -pub enum JobKind { - Prepare, - Execute, -} - -impl fmt::Display for JobKind { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Prepare => write!(f, "prepare"), - Self::Execute => write!(f, "execute"), - } - } -} - /// This is publicly exposed only for integration tests. #[doc(hidden)] pub async fn spawn_with_program_path( @@ -206,20 +191,19 @@ where } /// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up -/// from sleeping and then either sleeps for the remaining CPU time, or returns if we exceed the CPU -/// timeout. +/// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout. /// -/// Returning indicates that we should send a `TimedOut` error to the host. +/// Returning `Some` indicates that we should send a `TimedOut` error to the host. Will return +/// `None` if the other thread finishes first, without us timing out. /// -/// NOTE: Returning will cause the worker, whether preparation or execution, to be killed by the -/// host. We do not kill the process here because it would interfere with the proper handling of -/// this error. +/// NOTE: Sending a `TimedOut` error to the host will cause the worker, whether preparation or +/// execution, to be killed by the host. We do not kill the process here because it would interfere +/// with the proper handling of this error. pub fn cpu_time_monitor_loop( - job_kind: JobKind, cpu_time_start: ProcessTime, timeout: Duration, finished_rx: Receiver<()>, -) { +) -> Option { loop { let cpu_time_elapsed = cpu_time_start.elapsed(); @@ -230,23 +214,14 @@ pub fn cpu_time_monitor_loop( let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD; match finished_rx.recv_timeout(sleep_interval) { // Received finish signal. - Ok(()) => return, + Ok(()) => return None, // Timed out, restart loop. Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => return, + Err(RecvTimeoutError::Disconnected) => return None, } } - // Log if we exceed the timeout. - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "{job_kind} job took {}ms cpu time, exceeded {job_kind} timeout {}ms", - cpu_time_elapsed.as_millis(), - timeout.as_millis(), - ); - - return + return Some(cpu_time_elapsed) } } From 0f4ac06d7109dbeee2a69806e369e0333ac7f819 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 9 Jan 2023 07:59:59 -0500 Subject: [PATCH 18/18] Update Cargo.lock --- Cargo.lock | 70 ------------------------------------------------------ 1 file changed, 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9c24c7a9266..ad9d2b913bca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -305,47 +305,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" -[[package]] -name = "async-channel" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - -[[package]] -name = "async-executor" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965" -dependencies = [ - "async-task", - "concurrent-queue", - "fastrand", - "futures-lite", - "once_cell", - "slab", -] - -[[package]] -name = "async-global-executor" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9586ec52317f36de58453159d48351bc244bc24ced3effc1fce22f3d48664af6" -dependencies = [ - "async-channel", - "async-executor", - "async-io", - "async-mutex", - "blocking", - "futures-lite", - "num_cpus", - "once_cell", -] - [[package]] name = "async-io" version = "1.6.0" @@ -374,21 +333,6 @@ dependencies = [ "event-listener", ] -[[package]] -name = "async-mutex" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" -dependencies = [ - "event-listener", -] - -[[package]] -name = "async-task" -version = "4.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" - [[package]] name = "async-trait" version = "0.1.58" @@ -699,20 +643,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" -[[package]] -name = "blocking" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046e47d4b2d391b1f6f8b407b1deb8dee56c1852ccd868becf2710f601b5f427" -dependencies = [ - "async-channel", - "async-task", - "atomic-waker", - "fastrand", - "futures-lite", - "once_cell", -] - [[package]] name = "bounded-vec" version = "0.6.0"