Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Propagate errors and shutdown preparation and execution pipelines pro…
Browse files Browse the repository at this point in the history
…perly
  • Loading branch information
s0me0ne-unkn0wn committed Mar 13, 2023
1 parent 335495b commit b96cc31
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 58 deletions.
1 change: 1 addition & 0 deletions cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub enum Subcommand {
#[derive(Debug, Parser)]
pub struct ValidationWorkerCommand {
/// The path to the validation host's socket.
#[arg(long)]
pub socket_path: String,
/// Calling node implementation version
#[arg(long)]
Expand Down
5 changes: 4 additions & 1 deletion cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,10 @@ pub fn run() -> Result<()> {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::execute_worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
Ok(())
}
},
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
mod queue;
mod worker;

pub use queue::{start, ToQueue};
pub use queue::{start, FromQueue, ToQueue};
pub use worker::{worker_entrypoint, Response as ExecuteResponse};
82 changes: 62 additions & 20 deletions node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ pub enum ToQueue {
executor_params: ExecutorParams,
result_tx: ResultSender,
},
Shutdown,
}

#[derive(Debug)]
pub enum FromQueue {
// Node and worker version mismatch
VersionMismatch,
// Queue shutdown complete
ShutdownComplete,
}

struct ExecuteJob {
Expand Down Expand Up @@ -128,10 +137,13 @@ enum QueueEvent {
type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>;

struct Queue {
running: bool,
metrics: Metrics,

/// The receiver that receives messages to the pool.
to_queue_rx: mpsc::Receiver<ToQueue>,
/// The sender for reporting error conditions
from_queue_tx: mpsc::UnboundedSender<FromQueue>,

program_path: PathBuf,
spawn_timeout: Duration,
Expand All @@ -149,12 +161,15 @@ impl Queue {
worker_capacity: usize,
spawn_timeout: Duration,
to_queue_rx: mpsc::Receiver<ToQueue>,
from_queue_tx: mpsc::UnboundedSender<FromQueue>,
) -> Self {
Self {
running: true,
metrics,
program_path,
spawn_timeout,
to_queue_rx,
from_queue_tx,
queue: VecDeque::new(),
mux: Mux::new(),
workers: Workers {
Expand Down Expand Up @@ -259,23 +274,34 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
}

fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue { artifact, exec_timeout, params, executor_params, result_tx } = to_queue;
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
"enqueueing an artifact for execution",
);
queue.metrics.execute_enqueued();
let job = ExecuteJob {
artifact,
exec_timeout,
params,
executor_params,
result_tx,
waiting_since: Instant::now(),
};
queue.queue.push_back(job);
queue.try_assign_next_job(None);
match to_queue {
ToQueue::Enqueue { artifact, exec_timeout, params, executor_params, result_tx } => {
if !queue.running {
return
}
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
"enqueueing an artifact for execution",
);
queue.metrics.execute_enqueued();
let job = ExecuteJob {
artifact,
exec_timeout,
params,
executor_params,
result_tx,
waiting_since: Instant::now(),
};
queue.queue.push_back(job);
queue.try_assign_next_job(None);
},
ToQueue::Shutdown => {
queue.running = false;
queue.workers.running.clear();
let _ = queue.from_queue_tx.unbounded_send(FromQueue::ShutdownComplete);
},
}
}

async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
Expand All @@ -295,6 +321,9 @@ fn handle_worker_spawned(
handle: WorkerHandle,
job: ExecuteJob,
) {
if !queue.running {
return
}
queue.metrics.execute_worker().on_spawned();
queue.workers.spawn_inflight -= 1;
let worker = queue.workers.running.insert(WorkerData {
Expand Down Expand Up @@ -337,6 +366,10 @@ fn handle_job_finish(
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)),
None,
),
Outcome::VersionMismatch => {
let _ = queue.from_queue_tx.unbounded_send(FromQueue::VersionMismatch);
return
},
};

queue.metrics.execute_finished();
Expand Down Expand Up @@ -468,8 +501,17 @@ pub fn start(
program_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
) -> (mpsc::Sender<ToQueue>, impl Future<Output = ()>) {
) -> (mpsc::Sender<ToQueue>, mpsc::UnboundedReceiver<FromQueue>, impl Future<Output = ()>) {
let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
let run = Queue::new(metrics, program_path, worker_capacity, spawn_timeout, to_queue_rx).run();
(to_queue_tx, run)
let (from_queue_tx, from_queue_rx) = mpsc::unbounded();
let run = Queue::new(
metrics,
program_path,
worker_capacity,
spawn_timeout,
to_queue_rx,
from_queue_tx,
)
.run();
(to_queue_tx, from_queue_rx, run)
}
28 changes: 24 additions & 4 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ pub enum Outcome {
InternalError { err: String, idle_worker: IdleWorker },
/// The execution time exceeded the hard limit. The worker is terminated.
HardTimeout,
/// The worker recognized a version mismatch between node software and worker.
/// Node should be shut down.
VersionMismatch,
/// An I/O error happened during communication with the worker. This may mean that the worker
/// process already died. The token is not returned in any case.
IoErr,
Expand Down Expand Up @@ -172,6 +175,7 @@ pub async fn start_work(
Response::InvalidCandidate(err) =>
Outcome::InvalidCandidate { err, idle_worker: IdleWorker { stream, pid } },
Response::TimedOut => Outcome::HardTimeout,
Response::VersionMismatch => Outcome::VersionMismatch,
Response::InternalError(err) =>
Outcome::InternalError { err, idle_worker: IdleWorker { stream, pid } },
}
Expand Down Expand Up @@ -247,6 +251,7 @@ pub enum Response {
InvalidCandidate(String),
TimedOut,
InternalError(String),
VersionMismatch,
}

impl Response {
Expand All @@ -261,19 +266,34 @@ impl Response {

/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
let handshake = recv_handshake(&mut stream).await?;
let worker_pid = std::process::id();
let version_mismatch = if let Some(version) = node_version {
version != env!("SUBSTRATE_CLI_IMPL_VERSION")
} else {
false
};

let handshake = recv_handshake(&mut stream).await?;
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?);

loop {
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
if version_mismatch {
gum::error!(
target: LOG_TARGET,
%worker_pid,
"worker: node and worker version mismatch",
);
send_response(&mut stream, Response::VersionMismatch).await?;
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
}
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
%worker_pid,
"worker: validating artifact {}",
artifact_path.display(),
);
Expand Down Expand Up @@ -307,7 +327,7 @@ pub fn worker_entrypoint(socket_path: &str) {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
%worker_pid,
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
cpu_time_elapsed.as_millis(),
execution_timeout.as_millis(),
Expand Down
Loading

0 comments on commit b96cc31

Please sign in to comment.