Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Use PVF code paired with executor params wherever possible (#6742)
Browse files Browse the repository at this point in the history
  • Loading branch information
s0me0ne-unkn0wn authored Feb 20, 2023
1 parent 7cc5b88 commit d9c8e6c
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 99 deletions.
8 changes: 4 additions & 4 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#![warn(missing_docs)]

use polkadot_node_core_pvf::{
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf,
PvfWithExecutorParams, ValidationError, ValidationHost,
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, PvfWithExecutorParams,
ValidationError, ValidationHost,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
Expand Down Expand Up @@ -334,7 +334,7 @@ where
&validation_code.0,
VALIDATION_CODE_BOMB_LIMIT,
) {
Ok(code) => PvfWithExecutorParams::new(Pvf::from_code(code.into_owned()), executor_params),
Ok(code) => PvfWithExecutorParams::from_code(code.into_owned(), executor_params),
Err(e) => {
gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code");
return PreCheckOutcome::Invalid
Expand Down Expand Up @@ -683,7 +683,7 @@ trait ValidationBackend {
) -> Result<WasmValidationResult, ValidationError> {
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf_with_params =
PvfWithExecutorParams::new(Pvf::from_code(raw_validation_code), executor_params);
PvfWithExecutorParams::from_code(raw_validation_code, executor_params);

let mut validation_result =
self.validate_candidate(pvf_with_params.clone(), timeout, params.encode()).await;
Expand Down
6 changes: 3 additions & 3 deletions node/core/pvf/src/executor_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ pub fn prevalidate(code: &[u8]) -> Result<RuntimeBlob, sc_executor_common::error
/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk.
pub fn prepare(
blob: RuntimeBlob,
executor_params: ExecutorParams,
executor_params: &ExecutorParams,
) -> Result<Vec<u8>, sc_executor_common::error::WasmError> {
let semantics = params_to_wasmtime_semantics(executor_params)
.map_err(|e| sc_executor_common::error::WasmError::Other(e))?;
sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics)
}

fn params_to_wasmtime_semantics(par: ExecutorParams) -> Result<Semantics, String> {
fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result<Semantics, String> {
let mut sem = DEFAULT_CONFIG.semantics.clone();
let mut stack_limit = if let Some(stack_limit) = sem.deterministic_stack_limit.clone() {
stack_limit
Expand Down Expand Up @@ -186,7 +186,7 @@ impl Executor {
TaskSpawner::new().map_err(|e| format!("cannot create task spawner: {}", e))?;

let mut config = DEFAULT_CONFIG.clone();
config.semantics = params_to_wasmtime_semantics(params)?;
config.semantics = params_to_wasmtime_semantics(&params)?;

Ok(Self { thread_pool, spawner, config })
}
Expand Down
8 changes: 4 additions & 4 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ async fn handle_execute_pvf(
artifact: ArtifactPathId::new(artifact_id, cache_path),
execution_timeout,
params,
executor_params: pvf_with_params.executor_params(),
executor_params: (*pvf_with_params.executor_params()).clone(),
result_tx,
},
)
Expand All @@ -534,7 +534,7 @@ async fn handle_execute_pvf(
artifact_id,
execution_timeout,
params,
pvf_with_params.executor_params(),
(*pvf_with_params.executor_params()).clone(),
result_tx,
);
},
Expand All @@ -556,7 +556,7 @@ async fn handle_execute_pvf(
waiting_for_response: Vec::new(),
num_failures: *num_failures,
};
let executor_params = pvf_with_params.executor_params().clone();
let executor_params = (*pvf_with_params.executor_params()).clone();
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
Expand Down Expand Up @@ -584,7 +584,7 @@ async fn handle_execute_pvf(
} else {
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
// PVF.
let executor_params = pvf_with_params.executor_params();
let executor_params = (*pvf_with_params.executor_params()).clone();
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare(
prepare_queue,
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub use sp_tracing;
pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
pub use prepare::PrepareStats;
pub use priority::Priority;
pub use pvf::{Pvf, PvfWithExecutorParams};
pub use pvf::PvfWithExecutorParams;

pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
Expand Down
17 changes: 6 additions & 11 deletions node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@ use super::worker::{self, Outcome};
use crate::{
error::{PrepareError, PrepareResult},
metrics::Metrics,
pvf::PvfWithExecutorParams,
worker_common::{IdleWorker, WorkerHandle},
LOG_TARGET,
};
use always_assert::never;
use futures::{
channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
};
use polkadot_primitives::vstaging::ExecutorParams;
use slotmap::HopSlotMap;
use std::{
fmt,
path::{Path, PathBuf},
sync::Arc,
task::Poll,
time::Duration,
};
Expand Down Expand Up @@ -68,9 +67,8 @@ pub enum ToPool {
/// sent until either `Concluded` or `Rip` message is received.
StartWork {
worker: Worker,
code: Arc<Vec<u8>>,
pvf_with_params: PvfWithExecutorParams,
artifact_path: PathBuf,
executor_params: ExecutorParams,
preparation_timeout: Duration,
},
}
Expand Down Expand Up @@ -216,7 +214,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path, executor_params, preparation_timeout } => {
ToPool::StartWork { worker, pvf_with_params, artifact_path, preparation_timeout } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
Expand All @@ -225,10 +223,9 @@ fn handle_to_pool(
metrics.clone(),
worker,
idle,
code,
pvf_with_params,
cache_path.to_owned(),
artifact_path,
executor_params,
preparation_timeout,
preparation_timer,
)
Expand Down Expand Up @@ -275,20 +272,18 @@ async fn start_work_task<Timer>(
metrics: Metrics,
worker: Worker,
idle: IdleWorker,
code: Arc<Vec<u8>>,
pvf_with_params: PvfWithExecutorParams,
cache_path: PathBuf,
artifact_path: PathBuf,
executor_params: ExecutorParams,
preparation_timeout: Duration,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome = worker::start_work(
&metrics,
idle,
code,
pvf_with_params,
&cache_path,
artifact_path,
executor_params,
preparation_timeout,
)
.await;
Expand Down
3 changes: 1 addition & 2 deletions node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,8 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal
&mut queue.to_pool_tx,
pool::ToPool::StartWork {
worker,
code: job_data.pvf_with_params.code(),
pvf_with_params: job_data.pvf_with_params.clone(),
artifact_path,
executor_params: job_data.pvf_with_params.executor_params(),
preparation_timeout: job_data.preparation_timeout,
},
)
Expand Down
48 changes: 22 additions & 26 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
error::{PrepareError, PrepareResult},
metrics::Metrics,
prepare::PrepareStats,
pvf::PvfWithExecutorParams,
worker_common::{
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
Expand All @@ -34,12 +35,12 @@ use crate::{
use cpu_time::ProcessTime;
use futures::{pin_mut, select_biased, FutureExt};
use parity_scale_codec::{Decode, Encode};
use polkadot_primitives::vstaging::ExecutorParams;

use sp_core::hexdisplay::HexDisplay;
use std::{
panic,
path::{Path, PathBuf},
sync::{mpsc::channel, Arc},
sync::mpsc::channel,
time::Duration,
};
use tokio::{io, net::UnixStream};
Expand Down Expand Up @@ -83,10 +84,9 @@ pub enum Outcome {
pub async fn start_work(
metrics: &Metrics,
worker: IdleWorker,
code: Arc<Vec<u8>>,
pvf_with_params: PvfWithExecutorParams,
cache_path: &Path,
artifact_path: PathBuf,
executor_params: ExecutorParams,
preparation_timeout: Duration,
) -> Outcome {
let IdleWorker { stream, pid } = worker;
Expand All @@ -100,7 +100,7 @@ pub async fn start_work(

with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move {
if let Err(err) =
send_request(&mut stream, code, &tmp_file, &executor_params, preparation_timeout).await
send_request(&mut stream, pvf_with_params, &tmp_file, preparation_timeout).await
{
gum::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -273,44 +273,42 @@ where

async fn send_request(
stream: &mut UnixStream,
code: Arc<Vec<u8>>,
pvf_with_params: PvfWithExecutorParams,
tmp_file: &Path,
executor_params: &ExecutorParams,
preparation_timeout: Duration,
) -> io::Result<()> {
framed_send(stream, &code).await?;
framed_send(stream, &pvf_with_params.encode()).await?;
framed_send(stream, path_to_bytes(tmp_file)).await?;
framed_send(stream, &executor_params.encode()).await?;
framed_send(stream, &preparation_timeout.encode()).await?;
Ok(())
}

async fn recv_request(
stream: &mut UnixStream,
) -> io::Result<(Vec<u8>, PathBuf, ExecutorParams, Duration)> {
let code = framed_recv(stream).await?;
) -> io::Result<(PvfWithExecutorParams, PathBuf, Duration)> {
let pvf_with_params = framed_recv(stream).await?;
let pvf_with_params =
PvfWithExecutorParams::decode(&mut &pvf_with_params[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_request: failed to decode PvfWithExecutorParams: {}", e),
)
})?;
let tmp_file = framed_recv(stream).await?;
let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"prepare pvf recv_request: non utf-8 artifact path".to_string(),
)
})?;
let executor_params_enc = framed_recv(stream).await?;
let executor_params = ExecutorParams::decode(&mut &executor_params_enc[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"prepare pvf recv_request: failed to decode ExecutorParams".to_string(),
)
})?;
let preparation_timeout = framed_recv(stream).await?;
let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_request: failed to decode duration: {:?}", e),
)
})?;
Ok((code, tmp_file, executor_params, preparation_timeout))
Ok((pvf_with_params, tmp_file, preparation_timeout))
}

async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
Expand Down Expand Up @@ -362,8 +360,7 @@ pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
loop {
let worker_pid = std::process::id();
let (code, dest, executor_params, preparation_timeout) =
recv_request(&mut stream).await?;
let (pvf_with_params, dest, preparation_timeout) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
%worker_pid,
Expand All @@ -388,7 +385,7 @@ pub fn worker_entrypoint(socket_path: &str) {
// Spawn another thread for preparation.
let prepare_fut = rt_handle
.spawn_blocking(move || {
let result = prepare_artifact(&code, executor_params);
let result = prepare_artifact(pvf_with_params);

// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -471,16 +468,15 @@ pub fn worker_entrypoint(socket_path: &str) {
}

fn prepare_artifact(
code: &[u8],
executor_params: ExecutorParams,
pvf_with_params: PvfWithExecutorParams,
) -> Result<CompiledArtifact, PrepareError> {
panic::catch_unwind(|| {
let blob = match crate::executor_intf::prevalidate(code) {
let blob = match crate::executor_intf::prevalidate(&pvf_with_params.code()) {
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
Ok(b) => b,
};

match crate::executor_intf::prepare(blob, executor_params) {
match crate::executor_intf::prepare(blob, &pvf_with_params.executor_params()) {
Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
}
Expand Down
Loading

0 comments on commit d9c8e6c

Please sign in to comment.