Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Check spawned worker version vs node version before PVF preparation (#…
Browse files Browse the repository at this point in the history
…6861)

* Check spawned worker version vs node version before PVF preparation

* Address discussions

* Propagate errors and shutdown preparation and execution pipelines properly

* Add logs; Fix execution worker checks

* Revert "Propagate errors and shutdown preparation and execution pipelines properly"

This reverts commit b96cc31.

* Don't try to shut down; report the condition and exit worker

* Get rid of `VersionMismatch` preparation error

* Merge master

* Add docs; Fix tests

* Update Cargo.lock

* Kill again, but only the main node process

* Move unsafe code to a common safe function

* Fix libc dependency error on MacOS

* pvf spawning: Add some logging, add a small integration test

* Minor fixes

* Restart CI

---------

Co-authored-by: Marcin S <marcin@realemail.net>
  • Loading branch information
2 people authored and coderobe committed Apr 4, 2023
1 parent 0d8d6ce commit 271b10d
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ pub enum Subcommand {
#[derive(Debug, Parser)]
pub struct ValidationWorkerCommand {
/// The path to the validation host's socket.
#[arg(long)]
pub socket_path: String,
/// Calling node implementation version
#[arg(long)]
pub node_impl_version: String,
}

#[allow(missing_docs)]
Expand Down
10 changes: 8 additions & 2 deletions cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,10 @@ pub fn run() -> Result<()> {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::prepare_worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
Ok(())
}
},
Expand All @@ -513,7 +516,10 @@ pub fn run() -> Result<()> {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::execute_worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
Ok(())
}
},
Expand Down
5 changes: 4 additions & 1 deletion node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ cpu-time = "1.0.0"
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
libc = "0.2.139"
pin-project = "1.0.9"
rand = "0.8.5"
rayon = "1.5.1"
Expand All @@ -41,8 +42,10 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch =
sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.41" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.41" }

[build-dependencies]
substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }

[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2.139"
tikv-jemalloc-ctl = "0.5.0"

[dev-dependencies]
Expand Down
19 changes: 19 additions & 0 deletions node/core/pvf/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2017-2023 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

fn main() {
substrate_build_script_utils::generate_cargo_keys();
}
34 changes: 26 additions & 8 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ pub async fn spawn(
executor_params: ExecutorParams,
spawn_timeout: Duration,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let (mut idle_worker, worker_handle) =
spawn_with_program_path("execute", program_path, &["execute-worker"], spawn_timeout)
.await?;
let (mut idle_worker, worker_handle) = spawn_with_program_path(
"execute",
program_path,
&["execute-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
spawn_timeout,
)
.await?;
send_handshake(&mut idle_worker.stream, Handshake { executor_params })
.await
.map_err(|error| {
Expand Down Expand Up @@ -260,11 +264,25 @@ impl Response {
}

/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
/// is checked against the worker version. A mismatch results in immediate worker termination.
/// `None` is used for tests and in other situations when version check is not necessary.
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
let handshake = recv_handshake(&mut stream).await?;
let worker_pid = std::process::id();
if let Some(version) = node_version {
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
gum::error!(
target: LOG_TARGET,
%worker_pid,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
crate::kill_parent_node_in_emergency();
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
}
}

let handshake = recv_handshake(&mut stream).await?;
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?);
Expand All @@ -273,7 +291,7 @@ pub fn worker_entrypoint(socket_path: &str) {
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
%worker_pid,
"worker: validating artifact {}",
artifact_path.display(),
);
Expand Down Expand Up @@ -307,7 +325,7 @@ pub fn worker_entrypoint(socket_path: &str) {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
%worker_pid,
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
cpu_time_elapsed.as_millis(),
execution_timeout.as_millis(),
Expand Down
1 change: 1 addition & 0 deletions node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub use pvf::PvfPrepData;

pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
pub(crate) use worker_common::kill_parent_node_in_emergency;
pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;

pub use execute::worker_entrypoint as execute_worker_entrypoint;
Expand Down
28 changes: 24 additions & 4 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ pub async fn spawn(
program_path: &Path,
spawn_timeout: Duration,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
spawn_with_program_path("prepare", program_path, &["prepare-worker"], spawn_timeout).await
spawn_with_program_path(
"prepare",
program_path,
&["prepare-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
spawn_timeout,
)
.await
}

pub enum Outcome {
Expand Down Expand Up @@ -321,7 +327,9 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
}

/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
/// is checked against the worker version. A mismatch results in immediate worker termination.
/// `None` is used for tests and in other situations when version check is not necessary.
///
/// # Flow
///
Expand All @@ -342,10 +350,22 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
///
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
/// send that in the `PrepareResult`.
pub fn worker_entrypoint(socket_path: &str) {
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
let worker_pid = std::process::id();
if let Some(version) = node_version {
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
gum::error!(
target: LOG_TARGET,
%worker_pid,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
crate::kill_parent_node_in_emergency();
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
}
}

loop {
let worker_pid = std::process::id();
let (pvf, dest) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
Expand Down
22 changes: 17 additions & 5 deletions node/core/pvf/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,34 @@ macro_rules! decl_puppet_worker_main {
$crate::sp_tracing::try_init_simple();

let args = std::env::args().collect::<Vec<_>>();
if args.len() < 2 {
if args.len() < 3 {
panic!("wrong number of arguments");
}

let mut version = None;
let mut socket_path: &str = "";

for i in 2..args.len() {
match args[i].as_ref() {
"--socket-path" => socket_path = args[i + 1].as_str(),
"--node-version" => version = Some(args[i + 1].as_str()),
_ => (),
}
}

let subcommand = &args[1];
match subcommand.as_ref() {
"exit" => {
std::process::exit(1);
},
"sleep" => {
std::thread::sleep(std::time::Duration::from_secs(5));
},
"prepare-worker" => {
let socket_path = &args[2];
$crate::prepare_worker_entrypoint(socket_path);
$crate::prepare_worker_entrypoint(&socket_path, version);
},
"execute-worker" => {
let socket_path = &args[2];
$crate::execute_worker_entrypoint(socket_path);
$crate::execute_worker_entrypoint(&socket_path, version);
},
other => panic!("unknown subcommand: {}", other),
}
Expand Down
43 changes: 41 additions & 2 deletions node/core/pvf/src/worker_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,21 @@ pub async fn spawn_with_program_path(
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot bind unix socket: {:?}",
err,
);
SpawnErr::Bind
})?;

let handle =
WorkerHandle::spawn(program_path, extra_args, socket_path).map_err(|err| {
WorkerHandle::spawn(&program_path, extra_args, socket_path).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot spawn a worker: {:?}",
err,
);
Expand All @@ -84,6 +88,8 @@ pub async fn spawn_with_program_path(
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot accept a worker: {:?}",
err,
);
Expand All @@ -92,6 +98,14 @@ pub async fn spawn_with_program_path(
Ok((IdleWorker { stream, pid: handle.id() }, handle))
}
_ = Delay::new(spawn_timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?spawn_timeout,
"spawning and connecting to socket timed out",
);
Err(SpawnErr::AcceptTimeout)
}
}
Expand Down Expand Up @@ -162,6 +176,13 @@ where
F: FnMut(Handle, UnixStream) -> Fut,
Fut: futures::Future<Output = io::Result<Never>>,
{
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"starting pvf worker ({})",
debug_id,
);

let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
let handle = rt.handle();
let err = rt
Expand All @@ -179,7 +200,7 @@ where
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"pvf worker ({}): {:?}",
"quitting pvf worker ({}): {:?}",
debug_id,
err,
);
Expand Down Expand Up @@ -280,6 +301,7 @@ impl WorkerHandle {
) -> io::Result<Self> {
let mut child = process::Command::new(program.as_ref())
.args(extra_args)
.arg("--socket-path")
.arg(socket_path.as_ref().as_os_str())
.stdout(std::process::Stdio::piped())
.kill_on_drop(true)
Expand Down Expand Up @@ -393,3 +415,20 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>
r.read_exact(&mut buf).await?;
Ok(buf)
}

/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGKILL`
/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
/// get closed by the OS and other workers receive error on socket read and also exit. Preparation
/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so
/// no leftover artifacts are possible.
pub(crate) fn kill_parent_node_in_emergency() {
unsafe {
// SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in
// some corner cases, which is checked. `kill()` never fails.
let ppid = libc::getppid();
if ppid > 1 {
libc::kill(ppid, libc::SIGKILL);
}
}
}
9 changes: 9 additions & 0 deletions node/core/pvf/tests/it/worker_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ use crate::PUPPET_EXE;
use polkadot_node_core_pvf::testing::worker_common::{spawn_with_program_path, SpawnErr};
use std::time::Duration;

// Test spawning a program that immediately exits with a failure code.
#[tokio::test]
async fn spawn_immediate_exit() {
let result =
spawn_with_program_path("integration-test", PUPPET_EXE, &["exit"], Duration::from_secs(2))
.await;
assert!(matches!(result, Err(SpawnErr::AcceptTimeout)));
}

#[tokio::test]
async fn spawn_timeout() {
let result =
Expand Down
4 changes: 2 additions & 2 deletions node/malus/src/malus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl MalusCli {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path, None);
}
},
NemesisVariant::PvfExecuteWorker(cmd) => {
Expand All @@ -108,7 +108,7 @@ impl MalusCli {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path, None);
}
},
}
Expand Down

0 comments on commit 271b10d

Please sign in to comment.