Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pvf refactor execute worker errors follow up #4071

Merged
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 14 additions & 8 deletions polkadot/node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ cfg-if = "1.0"
futures = "0.3.30"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
is_executable = "1.0.1"
libc = "0.2.152"
is_executable = { version = "1.0.1", optional = true }
pin-project = "1.0.9"
rand = "0.8.5"
slotmap = "1.0"
tempfile = "3.3.0"
thiserror = { workspace = true }
tokio = { version = "1.24.2", features = ["fs", "process"] }

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
parity-scale-codec = { version = "3.6.1", default-features = false, features = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, different formatter settings here and below

Copy link
Contributor Author

@maksimryndin maksimryndin Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I've got it right, but it is the result of applying taplo fmt polkadot/node/core/pvf/Cargo.toml --config .config/taplo.toml as in the ci job

"derive",
] }

polkadot-parachain-primitives = { path = "../../../parachain" }
polkadot-core-primitives = { path = "../../../core-primitives" }
Expand All @@ -37,14 +38,16 @@ polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-primitives = { path = "../../../primitives" }

sp-core = { path = "../../../../substrate/primitives/core" }
sp-wasm-interface = { path = "../../../../substrate/primitives/wasm-interface" }
sp-maybe-compressed-blob = { path = "../../../../substrate/primitives/maybe-compressed-blob" }
sp-maybe-compressed-blob = { path = "../../../../substrate/primitives/maybe-compressed-blob", optional = true }
polkadot-node-core-pvf-prepare-worker = { path = "prepare-worker", optional = true }
polkadot-node-core-pvf-execute-worker = { path = "execute-worker", optional = true }

[dev-dependencies]
assert_matches = "1.4.0"
criterion = { version = "0.4.0", default-features = false, features = ["async_tokio", "cargo_bench_support"] }
criterion = { version = "0.4.0", default-features = false, features = [
"async_tokio",
"cargo_bench_support",
] }
hex-literal = "0.4.1"

polkadot-node-core-pvf-common = { path = "common", features = ["test-utils"] }
Expand All @@ -57,6 +60,7 @@ adder = { package = "test-parachain-adder", path = "../../../parachain/test-para
halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }

[target.'cfg(target_os = "linux")'.dev-dependencies]
libc = "0.2.153"
procfs = "0.16.0"
rusty-fork = "0.3.0"
sc-sysinfo = { path = "../../../../substrate/client/sysinfo" }
Expand All @@ -70,6 +74,8 @@ ci-only-tests = []
jemalloc-allocator = ["polkadot-node-core-pvf-common/jemalloc-allocator"]
# This feature is used to export test code to other crates without putting it in the production build.
test-utils = [
"polkadot-node-core-pvf-execute-worker",
"polkadot-node-core-pvf-prepare-worker",
"dep:is_executable",
"dep:polkadot-node-core-pvf-execute-worker",
"dep:polkadot-node-core-pvf-prepare-worker",
"dep:sp-maybe-compressed-blob",
]
7 changes: 4 additions & 3 deletions polkadot/node/core/pvf/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ license.workspace = true
workspace = true

[dependencies]
cfg-if = "1.0"
cpu-time = "1.0.0"
futures = "0.3.30"
gum = { package = "tracing-gum", path = "../../../gum" }
libc = "0.2.152"
nix = { version = "0.27.1", features = ["resource", "sched"] }
thiserror = { workspace = true }

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
parity-scale-codec = { version = "3.6.1", default-features = false, features = [
"derive",
] }

polkadot-parachain-primitives = { path = "../../../../parachain" }
polkadot-primitives = { path = "../../../../primitives" }
Expand All @@ -34,7 +36,6 @@ sp-tracing = { path = "../../../../../substrate/primitives/tracing" }

[target.'cfg(target_os = "linux")'.dependencies]
landlock = "0.3.0"
nix = { version = "0.27.1", features = ["sched"] }

[target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies]
seccompiler = "0.4.0"
Expand Down
3 changes: 3 additions & 0 deletions polkadot/node/core/pvf/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ pub enum InternalValidationError {
/// Could not find or open compiled artifact file.
#[error("validation: could not find or open compiled artifact file: {0}")]
CouldNotOpenFile(String),
/// Could not create a pipe between the worker and a child process.
#[error("validation: could not create pipe: {0}")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring is missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!) Fixed 5bdda49

CouldNotCreatePipe(String),
/// Host could not clear the worker cache after a job.
#[error("validation: host could not clear the worker cache ({path:?}) after a job: {err}")]
CouldNotClearWorkerDir {
Expand Down
36 changes: 20 additions & 16 deletions polkadot/node/core/pvf/common/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,36 @@ pub struct Handshake {

/// The response from the execution worker.
#[derive(Debug, Encode, Decode)]
pub enum WorkerResponse {
/// The job completed successfully.
Ok {
/// The result of parachain validation.
result_descriptor: ValidationResult,
/// The amount of CPU time taken by the job.
duration: Duration,
},
/// The candidate is invalid.
InvalidCandidate(String),
/// Instantiation of the WASM module instance failed during an execution.
/// Possibly related to local issues or dirty node update. May be retried with re-preparation.
RuntimeConstruction(String),
pub struct WorkerResponse {
/// The response from the execute job process.
pub job_response: JobResponse,
/// The amount of CPU time taken by the job.
pub duration: Duration,
}

/// An error occurred in the worker process.
#[derive(thiserror::Error, Debug, Clone, Encode, Decode)]
pub enum WorkerError {
/// The job timed out.
#[error("The job timed out")]
JobTimedOut,
/// The job process has died. We must kill the worker just in case.
///
/// We cannot treat this as an internal error because malicious code may have killed the job.
/// We still retry it, because in the non-malicious case it is likely spurious.
#[error("The job process (pid {job_pid}) has died: {err}")]
JobDied { err: String, job_pid: i32 },
/// An unexpected error occurred in the job process, e.g. failing to spawn a thread, panic,
/// etc.
///
/// Because malicious code can cause a job error, we must not treat it as an internal error. We
/// still retry it, because in the non-malicious case it is likely spurious.
JobError(String),
#[error("An unexpected error occurred in the job process: {0}")]
JobError(#[from] JobError),

/// Some internal error occurred.
InternalError(InternalValidationError),
#[error("An internal error occurred: {0}")]
InternalError(#[from] InternalValidationError),
}

/// The result of a job on the execution worker.
Expand Down Expand Up @@ -101,7 +102,7 @@ impl JobResponse {
/// An unexpected error occurred in the execution job process. Because this comes from the job,
/// which executes untrusted code, this error must likewise be treated as untrusted. That is, we
/// cannot raise an internal error based on this.
#[derive(thiserror::Error, Debug, Encode, Decode)]
#[derive(thiserror::Error, Clone, Debug, Encode, Decode)]
pub enum JobError {
#[error("The job timed out")]
TimedOut,
Expand All @@ -114,4 +115,7 @@ pub enum JobError {
CouldNotSpawnThread(String),
#[error("An error occurred in the CPU time monitor thread: {0}")]
CpuTimeMonitorThread(String),
/// Since the job can return any exit status it wants, we have to treat this as untrusted.
#[error("Unexpected exit status: {0}")]
UnexpectedExitStatus(i32),
}
1 change: 1 addition & 0 deletions polkadot/node/core/pvf/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Contains functionality related to PVFs that is shared by the PVF host and the PVF workers.
#![deny(unused_crate_dependencies)]

pub mod error;
pub mod execute;
Expand Down
7 changes: 1 addition & 6 deletions polkadot/node/core/pvf/common/src/pvf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@ use crate::prepare::PrepareJobKind;
use parity_scale_codec::{Decode, Encode};
use polkadot_parachain_primitives::primitives::ValidationCodeHash;
use polkadot_primitives::ExecutorParams;
use std::{
cmp::{Eq, PartialEq},
fmt,
sync::Arc,
time::Duration,
};
use std::{fmt, sync::Arc, time::Duration};

/// A struct that carries the exhaustive set of data to prepare an artifact out of plain
/// Wasm binary
Expand Down
84 changes: 80 additions & 4 deletions polkadot/node/core/pvf/common/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
pub mod security;

use crate::{framed_recv_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET};
use crate::{
framed_recv_blocking, framed_send_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET,
};
use cpu_time::ProcessTime;
use futures::never::Never;
use parity_scale_codec::Decode;
use nix::{errno::Errno, sys::resource::Usage};
use parity_scale_codec::{Decode, Encode};
use std::{
any::Any,
fmt::{self},
Expand Down Expand Up @@ -58,8 +61,6 @@ macro_rules! decl_worker_main {

$crate::sp_tracing::try_init_simple();

let worker_pid = std::process::id();

let args = std::env::args().collect::<Vec<_>>();
if args.len() == 1 {
print_help($expected_command);
Expand Down Expand Up @@ -548,6 +549,81 @@ fn recv_worker_handshake(stream: &mut UnixStream) -> io::Result<WorkerHandshake>
Ok(worker_handshake)
}

/// Calculate the total CPU time from the given `usage` structure, returned from
/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user
/// and system time.
///
/// # Arguments
///
/// - `rusage`: Contains resource usage information.
///
/// # Returns
///
/// Returns a `Duration` representing the total CPU time.
pub fn get_total_cpu_usage(rusage: Usage) -> Duration {
let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
(rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;

return Duration::from_micros(micros)
}

/// Get a job response.
pub fn recv_child_response<T>(
received_data: &mut io::BufReader<&[u8]>,
context: &'static str,
) -> io::Result<T>
where
T: Decode,
{
let response_bytes = framed_recv_blocking(received_data)?;
T::decode(&mut response_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("{} pvf recv_child_response: decode error: {}", context, e),
)
})
}

pub fn send_result<T, E>(
stream: &mut UnixStream,
result: Result<T, E>,
worker_info: &WorkerInfo,
) -> io::Result<()>
where
T: std::fmt::Debug,
E: std::fmt::Debug + std::fmt::Display,
Result<T, E>: Encode,
{
if let Err(ref err) = result {
gum::warn!(
target: LOG_TARGET,
?worker_info,
"worker: error occurred: {}",
err
);
}
gum::trace!(
target: LOG_TARGET,
?worker_info,
"worker: sending result to host: {:?}",
result
);

framed_send_blocking(stream, &result.encode()).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
?worker_info,
"worker: error occurred sending result to host: {}",
err
);
err
})
}

pub fn stringify_errno(context: &'static str, errno: Errno) -> String {
format!("{}: {}: {}", context, errno, io::Error::last_os_error())
}

/// Functionality related to threads spawned by the workers.
///
/// The motivation for this module is to coordinate worker threads without using async Rust.
Expand Down
Loading
Loading