Skip to content

Commit

Permalink
Merge branch 'master' into fix/accept-obsolete-column-families
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilsa authored Dec 30, 2024
2 parents 2415d67 + 1f04f5d commit 0d3f92e
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 98 deletions.
2 changes: 1 addition & 1 deletion core/application/impl/kagome_application_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ namespace kagome::application {
if (not app_config_->disableSecureMode() and app_config_->usePvfSubprocess()
and app_config_->roles().isAuthority()) {
auto res = parachain::runSecureModeCheckProcess(
*injector_.injectIoContext(), app_config_->runtimeCacheDirPath());
app_config_->runtimeCacheDirPath());
if (!res) {
SL_ERROR(logger_, "Secure mode check failed: {}", res.error());
exit(EXIT_FAILURE);
Expand Down
29 changes: 28 additions & 1 deletion core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@
#include "runtime/runtime_api/impl/transaction_payment_api.hpp"
#include "runtime/wabt/instrument.hpp"
#include "runtime/wasm_compiler_definitions.hpp" // this header-file is generated
#include "utils/sptr.hpp"

#if KAGOME_WASM_COMPILER_WASM_EDGE == 1

Expand Down Expand Up @@ -790,6 +791,32 @@ namespace {
di::bind<parachain::BackedCandidatesSource>.template to<parachain::ParachainProcessorImpl>(),
di::bind<network::CanDisconnect>.template to<parachain::statement_distribution::StatementDistribution>(),
di::bind<parachain::Pvf>.template to<parachain::PvfImpl>(),
bind_by_lambda<parachain::SecureModeSupport>([config](const auto &) {
auto support = parachain::SecureModeSupport::none();
auto log = log::createLogger("Application", "application");
#ifdef __linux__
if (not config->disableSecureMode() and config->usePvfSubprocess()
and config->roles().isAuthority()) {
auto res = parachain::runSecureModeCheckProcess(config->runtimeCacheDirPath());
if (!res) {
SL_ERROR(log, "Secure mode check failed: {}", res.error());
exit(EXIT_FAILURE);
}
support = res.value();
if (not support.isTotallySupported()) {
SL_ERROR(log,
"Secure mode is not supported completely. You can disable it "
"using --insecure-validator-i-know-what-i-do.");
exit(EXIT_FAILURE);
}
}
#else
SL_WARN(log,
"Secure validator mode is not implemented for the current "
"platform. Proceed at your own risk.");
#endif
return toSptr(support);
}),
di::bind<network::CollationObserver>.template to<parachain::ParachainObserverImpl>(),
di::bind<network::ValidationObserver>.template to<parachain::ParachainObserverImpl>(),
di::bind<network::ReqCollationObserver>.template to<parachain::ParachainObserverImpl>(),
Expand Down Expand Up @@ -936,7 +963,7 @@ namespace kagome::injector {
KagomeNodeInjector::KagomeNodeInjector(
sptr<application::AppConfiguration> app_config)
: pimpl_{std::make_unique<KagomeNodeInjectorImpl>(
makeKagomeNodeInjector(std::move(app_config)))} {}
makeKagomeNodeInjector(std::move(app_config)))} {}

sptr<application::AppConfiguration> KagomeNodeInjector::injectAppConfig() {
return pimpl_->injector_
Expand Down
132 changes: 132 additions & 0 deletions core/parachain/pvf/clone.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <sys/wait.h>
#include <unistd.h>
#include <cerrno>

#ifdef __linux__
#include <sched.h>
#include <csignal>
#endif

#include "parachain/pvf/pvf_worker_types.hpp"

namespace kagome::parachain::clone {
constexpr size_t kCloneStackSize = 2 << 20;

enum class CloneError : uint8_t {
kCallbackFailed,
};
Q_ENUM_ERROR_CODE(CloneError) {
using E = decltype(e);
switch (e) {
case E::kCallbackFailed:
return "Callback failed";
}
abort();
}

#ifdef __linux__
// https://github.com/paritytech/polkadot-sdk/blob/f4a196ab1473856c9c5992239fcc2f14c2c42914/polkadot/node/core/pvf/common/src/worker/security/clone.rs#L35-L54
/// Try to run clone(2) on the current worker.
///
/// SAFETY: new process should be either spawned within a single threaded
/// process, or use only async-signal-safe functions.
template <typename Cb>
inline outcome::result<pid_t> clone(bool have_unshare_newuser, const Cb &cb) {
Buffer stack(kCloneStackSize);
// https://github.com/paritytech/polkadot-sdk/blob/f4a196ab1473856c9c5992239fcc2f14c2c42914/polkadot/node/core/pvf/common/src/worker/security/clone.rs#L75-L93
int flags = CLONE_NEWCGROUP | CLONE_NEWIPC | CLONE_NEWNET | CLONE_NEWNS
| CLONE_NEWPID | CLONE_NEWUTS | SIGCHLD;
if (not have_unshare_newuser) {
flags |= CLONE_NEWUSER;
}
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-vararg)
pid_t pid = ::clone(
[](void *arg) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto &cb = *reinterpret_cast<const Cb *>(arg);
return cb() ? EXIT_SUCCESS : EXIT_FAILURE;
},
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
stack.data() + stack.size(),
flags,
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
const_cast<void *>(static_cast<const void *>(&cb)));
if (pid == -1) {
return std::errc{errno};
}
return pid;
}
#endif

inline outcome::result<void> wait(pid_t pid) {
int status = 0;
if (waitpid(pid, &status, 0) == -1) {
return std::errc{errno};
}
if (not WIFEXITED(status) or WEXITSTATUS(status) != EXIT_SUCCESS) {
return CloneError::kCallbackFailed;
}
return outcome::success();
}

// https://github.com/paritytech/polkadot-sdk/blob/f4a196ab1473856c9c5992239fcc2f14c2c42914/polkadot/node/core/pvf/execute-worker/src/lib.rs#L245-L293
/// Call callback either directly, or inside `clone`, or inside `fork`.
inline outcome::result<void> cloneOrFork(const log::Logger &log,
const PvfWorkerInputConfig &config,
const auto &cb) {
auto cb_log = [&] {
auto r = cb();
if (not r) {
SL_WARN(log, "cloneOrFork cb returned error: {}", r.error());
return false;
}
return true;
};
if (config.force_disable_secure_mode) {
if (not cb_log()) {
return CloneError::kCallbackFailed;
}
return outcome::success();
}
std::optional<pid_t> pid;
#ifdef __linux__
if (config.secure_mode_support.can_do_secure_clone) {
BOOST_OUTCOME_TRY(pid, clone(config.secure_mode_support.chroot, cb_log));
}
#endif
if (not pid) {
pid = fork();
if (pid == -1) {
return std::errc{errno};
}
if (pid == 0) {
_Exit(cb_log() ? EXIT_SUCCESS : EXIT_FAILURE);
}
}
return wait(*pid);
}

// https://github.com/paritytech/polkadot-sdk/blob/f4a196ab1473856c9c5992239fcc2f14c2c42914/polkadot/node/core/pvf/common/src/worker/security/clone.rs#L56-L63
/// Runs a check for clone(2) with all sandboxing flags and returns an error
/// indicating whether it can be fully enabled on the current Linux
/// environment.
///
/// SAFETY: new process should be either spawned within a single threaded
/// process, or use only async-signal-safe functions.
inline outcome::result<void> check() {
#ifdef __linux__
OUTCOME_TRY(pid, clone(false, [] { return true; }));
return wait(pid);
#else
return std::errc::not_supported;
#endif
}
} // namespace kagome::parachain::clone
95 changes: 50 additions & 45 deletions core/parachain/pvf/kagome_pvf_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

#include <filesystem>
#include <iostream>
#include <memory>
#include <ranges>
#include <span>
Expand All @@ -23,6 +22,7 @@

#include <fmt/format.h>
#include <boost/asio.hpp>
#include <boost/asio/local/stream_protocol.hpp>
#include <boost/process.hpp>
#include <libp2p/basic/scheduler/asio_scheduler_backend.hpp>
#include <libp2p/basic/scheduler/scheduler_impl.hpp>
Expand All @@ -34,6 +34,7 @@
#include "common/bytestr.hpp"
#include "log/configurator.hpp"
#include "log/logger.hpp"
#include "parachain/pvf/clone.hpp"
#include "parachain/pvf/kagome_pvf_worker.hpp"
#include "parachain/pvf/kagome_pvf_worker_injector.hpp"
#include "parachain/pvf/pvf_worker_types.hpp"
Expand Down Expand Up @@ -62,6 +63,8 @@
}

namespace kagome::parachain {
using unix = boost::asio::local::stream_protocol;

namespace {
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
static kagome::log::Logger logger;
Expand Down Expand Up @@ -229,26 +232,21 @@ namespace kagome::parachain {
}
#endif

outcome::result<void> readStdin(std::span<uint8_t> out) {
std::cin.read(
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
reinterpret_cast<char *>(out.data()),
// NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions)
out.size());
if (not std::cin.good()) {
return std::errc::io_error;
}
return outcome::success();
}

template <typename T>
outcome::result<T> decodeInput() {
outcome::result<T> decodeInput(unix::socket &socket) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init,hicpp-member-init)
std::array<uint8_t, sizeof(uint32_t)> length_bytes;
OUTCOME_TRY(readStdin(length_bytes));
boost::system::error_code ec;
boost::asio::read(socket, boost::asio::buffer(length_bytes), ec);
if (ec) {
return ec;
}
OUTCOME_TRY(message_length, scale::decode<uint32_t>(length_bytes));
std::vector<uint8_t> packed_message(message_length, 0);
OUTCOME_TRY(readStdin(packed_message));
boost::asio::read(socket, boost::asio::buffer(packed_message), ec);
if (ec) {
return ec;
}
return scale::decode<T>(packed_message);
}

Expand Down Expand Up @@ -282,8 +280,16 @@ namespace kagome::parachain {
}
}

outcome::result<void> pvf_worker_main_outcome() {
OUTCOME_TRY(input_config, decodeInput<PvfWorkerInputConfig>());
outcome::result<void> pvf_worker_main_outcome(
const std::string &unix_socket_path) {
boost::asio::io_context io_context;
unix::socket socket{io_context};
boost::system::error_code ec;
socket.connect(unix_socket_path, ec);
if (ec) {
return ec;
}
OUTCOME_TRY(input_config, decodeInput<PvfWorkerInputConfig>(socket));
kagome::log::tuneLoggingSystem(input_config.log_params);

SL_VERBOSE(logger, "Cache directory: {}", input_config.cache_dir);
Expand Down Expand Up @@ -347,7 +353,7 @@ namespace kagome::parachain {
OUTCOME_TRY(factory, createModuleFactory(injector, input_config.engine));
std::shared_ptr<runtime::Module> module;
while (true) {
OUTCOME_TRY(input, decodeInput<PvfWorkerInput>());
OUTCOME_TRY(input, decodeInput<PvfWorkerInput>(socket));

if (auto *code_params = std::get_if<PvfWorkerInputCodeParams>(&input)) {
auto &path = code_params->path;
Expand All @@ -361,26 +367,27 @@ namespace kagome::parachain {
SL_ERROR(logger, "PvfWorkerInputCodeParams expected");
return std::errc::invalid_argument;
}
OUTCOME_TRY(instance, module->instantiate());

OUTCOME_TRY(ctx, runtime::RuntimeContextFactory::stateless(instance));
OUTCOME_TRY(
result,
instance->callExportFunction(ctx, "validate_block", input_args));
OUTCOME_TRY(instance->resetEnvironment());
OUTCOME_TRY(len, scale::encode<uint32_t>(result.size()));

std::cout.write(
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
reinterpret_cast<const char *>(len.data()),
// NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions)
len.size());
std::cout.write(
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
reinterpret_cast<const char *>(result.data()),
// NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions)
result.size());
std::cout.flush();
auto forked = [&]() -> outcome::result<void> {
OUTCOME_TRY(instance, module->instantiate());

OUTCOME_TRY(ctx, runtime::RuntimeContextFactory::stateless(instance));
OUTCOME_TRY(
result,
instance->callExportFunction(ctx, "validate_block", input_args));
OUTCOME_TRY(instance->resetEnvironment());
OUTCOME_TRY(len, scale::encode<uint32_t>(result.size()));

boost::asio::write(socket, boost::asio::buffer(len), ec);
if (ec) {
return ec;
}
boost::asio::write(socket, boost::asio::buffer(result), ec);
if (ec) {
return ec;
}
return outcome::success();
};
OUTCOME_TRY(clone::cloneOrFork(logger, input_config, forked));
}
}

Expand All @@ -399,14 +406,12 @@ namespace kagome::parachain {
}
kagome::log::setLoggingSystem(logging_system);
logger = kagome::log::createLogger("PVF Worker", "parachain");

if (!checkEnvVarsEmpty(env)) {
logger->error(
"PVF worker processes must not have any environment variables.");
if (argc < 2) {
SL_ERROR(logger, "missing unix socket path arg");
return EXIT_FAILURE;
}

if (auto r = pvf_worker_main_outcome(); not r) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
if (auto r = pvf_worker_main_outcome(argv[1]); not r) {
SL_ERROR(logger, "PVF worker process failed: {}", r.error());
return EXIT_FAILURE;
}
Expand Down
4 changes: 4 additions & 0 deletions core/parachain/pvf/kagome_pvf_worker_injector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@
#include "crypto/ed25519/ed25519_provider_impl.hpp"
#include "crypto/elliptic_curves/elliptic_curves_impl.hpp"
#include "crypto/hasher/hasher_impl.hpp"
#include "crypto/key_store.hpp"
#include "crypto/pbkdf2/impl/pbkdf2_provider_impl.hpp"
#include "crypto/secp256k1/secp256k1_provider_impl.hpp"
#include "crypto/sr25519/sr25519_provider_impl.hpp"
#include "host_api/impl/host_api_factory_impl.hpp"
#include "injector/bind_by_lambda.hpp"
#include "offchain/offchain_persistent_storage.hpp"
#include "offchain/offchain_worker_pool.hpp"
#include "parachain/pvf/pvf_worker_types.hpp"
#include "runtime/binaryen/instance_environment_factory.hpp"
#include "runtime/binaryen/module/module_factory_impl.hpp"
#include "runtime/common/core_api_factory_impl.hpp"
#include "runtime/common/runtime_properties_cache_impl.hpp"
#include "runtime/memory_provider.hpp"
#include "runtime/module.hpp"
#include "runtime/runtime_instances_pool.hpp"
#include "runtime/wasm_compiler_definitions.hpp" // this header-file is generated
#include "storage/trie/serialization/trie_serializer_impl.hpp"
#include "storage/trie/trie_storage.hpp"
Expand Down
Loading

0 comments on commit 0d3f92e

Please sign in to comment.