From 2b05cbc8c18beb9a80b196d214083d3a997d8cf4 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 19 Aug 2021 16:39:58 -0400 Subject: [PATCH] Set up Tokio runtime in main() Each entrypoint to the container bits sets up a tokio runtime, which is inefficient and duplicative. We're also likely to start using Rust async in more places. Instead, create a Tokio runtime early in our `main`, and change the CLI entrypoint to be an `async fn`. The other setup of a runtime we have is deep inside the sysroot upgrader bits, also for the container. In this case we actually have another thread (distinct from the main one where we set up Tokio) created by C/C++, so we need to pass a `tokio::runtime::Handle` across, and call `enter()` on it to set up the thread local bindings to access tokio async from there. I was initially looking at properly handling `GCancellable` with tokio and wanted to clean this up first. --- rust/src/container.rs | 12 +++----- rust/src/lib.rs | 11 +++++++ rust/src/main.rs | 43 ++++++++++++++++----------- rust/src/sysroot_upgrade.rs | 13 ++------ rust/src/tokio_ffi.rs | 16 ++++++++++ src/daemon/rpmostreed-transaction.cxx | 9 ++++++ 6 files changed, 69 insertions(+), 35 deletions(-) create mode 100644 rust/src/tokio_ffi.rs diff --git a/rust/src/container.rs b/rust/src/container.rs index 8e0d67d1f5..ea18e5ffda 100644 --- a/rust/src/container.rs +++ b/rust/src/container.rs @@ -2,19 +2,15 @@ // SPDX-License-Identifier: Apache-2.0 OR MIT -use anyhow::{Context, Result}; +use anyhow::Result; /// Main entrypoint for container -pub fn entrypoint(args: &[&str]) -> Result<()> { +pub async fn entrypoint(args: &[&str]) -> Result { // Right now we're only exporting the `container` bits, not tar. So inject that argument. // And we also need to skip the main arg and the `ex-container` arg. let args = ["rpm-ostree", "container"] .iter() .chain(args.iter().skip(2)); - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .context("Failed to build tokio runtime")? - .block_on(async { ostree_ext::cli::run_from_iter(args).await })?; - Ok(()) + ostree_ext::cli::run_from_iter(args).await?; + Ok(0) } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 68f1600ccf..878ee807ca 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -308,6 +308,15 @@ pub mod ffi { fn modularity_entrypoint(args: &Vec) -> Result<()>; } + // tokio_ffi.rs + extern "Rust" { + type TokioHandle; + type TokioEnterGuard<'a>; + + fn tokio_handle_get() -> Box; + unsafe fn enter<'a>(self: &'a TokioHandle) -> Box>; + } + // scripts.rs extern "Rust" { fn script_is_ignored(pkg: &str, script: &str) -> bool; @@ -644,6 +653,8 @@ use passwd::*; mod console_progress; pub(crate) use self::console_progress::*; mod progress; +mod tokio_ffi; +pub(crate) use self::tokio_ffi::*; mod scripts; pub(crate) use self::scripts::*; mod sysroot_upgrade; diff --git a/rust/src/main.rs b/rust/src/main.rs index 95f856b21d..16a516d908 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -24,6 +24,27 @@ fn usroverlay(args: &[&str]) -> Result<()> { .context("Failed to execute ostree admin unlock") } +// And now we've done process global initialization, we have a tokio runtime setup; process the command line. +async fn inner_async_main(args: &[&str]) -> Result { + // It is only recently that our main() function is in Rust, calling + // into C++ as a library. As of right now, the only Rust commands + // are hidden, i.e. should not appear in --help. So we just recognize + // those, and if there's something we don't know about, invoke the C++ + // main(). + match args.get(1).copied() { + // Add custom Rust commands here, and also in `libmain.cxx` if user-visible. + Some("countme") => rpmostree_rust::countme::entrypoint(&args).map(|_| 0), + Some("cliwrap") => rpmostree_rust::cliwrap::entrypoint(&args).map(|_| 0), + Some("ex-container") => rpmostree_rust::container::entrypoint(&args).await, + // The `unlock` is a hidden alias for "ostree CLI compatibility" + Some("usroverlay") | Some("unlock") => usroverlay(&args).map(|_| 0), + _ => { + // Otherwise fall through to C++ main(). + Ok(rpmostree_rust::ffi::rpmostree_main(&args)?) + } + } +} + /// The real main function returns a `Result<>`. fn inner_main() -> Result { if std::env::var("RPMOSTREE_GDB_HOOK").is_ok() { @@ -54,23 +75,11 @@ fn inner_main() -> Result { .collect(); let args = args?; let args: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - // It is only recently that our main() function is in Rust, calling - // into C++ as a library. As of right now, the only Rust commands - // are hidden, i.e. should not appear in --help. So we just recognize - // those, and if there's something we don't know about, invoke the C++ - // main(). - match args.get(1).copied() { - // Add custom Rust commands here, and also in `libmain.cxx` if user-visible. - Some("countme") => rpmostree_rust::countme::entrypoint(&args).map(|_| 0), - Some("cliwrap") => rpmostree_rust::cliwrap::entrypoint(&args).map(|_| 0), - Some("ex-container") => rpmostree_rust::container::entrypoint(&args).map(|_| 0), - // The `unlock` is a hidden alias for "ostree CLI compatibility" - Some("usroverlay") | Some("unlock") => usroverlay(&args).map(|_| 0), - _ => { - // Otherwise fall through to C++ main(). - Ok(rpmostree_rust::ffi::rpmostree_main(&args)?) - } - } + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .context("Failed to build tokio runtime")? + .block_on(async { inner_async_main(&args).await }) } fn print_error(e: anyhow::Error) { diff --git a/rust/src/sysroot_upgrade.rs b/rust/src/sysroot_upgrade.rs index 6bdfa5316d..80b7408247 100644 --- a/rust/src/sysroot_upgrade.rs +++ b/rust/src/sysroot_upgrade.rs @@ -4,9 +4,9 @@ use crate::cxxrsutil::*; use crate::ffi::ContainerImport; -use anyhow::{Context, Result}; use std::convert::TryInto; use std::pin::Pin; +use tokio::runtime::Handle; /// Import ostree commit in container image using ostree-rs-ext's API. pub(crate) fn import_container( @@ -16,7 +16,7 @@ pub(crate) fn import_container( // TODO: take a GCancellable and monitor it, and drop the import task (which is how async cancellation works in Rust). let repo = repo.gobj_wrap(); let imgref = imgref.as_str().try_into()?; - let imported = build_runtime()? + let imported = Handle::current() .block_on(async { ostree_ext::container::import(&repo, &imgref, None).await })?; Ok(Box::new(ContainerImport { ostree_commit: imported.ostree_commit, @@ -27,14 +27,7 @@ pub(crate) fn import_container( /// Fetch the image digest for `imgref` using ostree-rs-ext's API. pub(crate) fn fetch_digest(imgref: String) -> CxxResult { let imgref = imgref.as_str().try_into()?; - let digest = build_runtime()? + let digest = Handle::current() .block_on(async { ostree_ext::container::fetch_manifest_info(&imgref).await })?; Ok(digest.manifest_digest) } - -fn build_runtime() -> Result { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .context("Failed to build tokio runtime") -} diff --git a/rust/src/tokio_ffi.rs b/rust/src/tokio_ffi.rs new file mode 100644 index 0000000000..62d9600a9e --- /dev/null +++ b/rust/src/tokio_ffi.rs @@ -0,0 +1,16 @@ +//! Helpers to bridge tokio to C++ + +// SPDX-License-Identifier: Apache-2.0 OR MIT + +pub(crate) struct TokioHandle(tokio::runtime::Handle); +pub(crate) struct TokioEnterGuard<'a>(tokio::runtime::EnterGuard<'a>); + +pub(crate) fn tokio_handle_get() -> Box { + Box::new(TokioHandle(tokio::runtime::Handle::current())) +} + +impl TokioHandle { + pub(crate) fn enter(&self) -> Box { + Box::new(TokioEnterGuard(self.0.enter())) + } +} diff --git a/src/daemon/rpmostreed-transaction.cxx b/src/daemon/rpmostreed-transaction.cxx index 823433e9a3..feff3ade86 100644 --- a/src/daemon/rpmostreed-transaction.cxx +++ b/src/daemon/rpmostreed-transaction.cxx @@ -23,6 +23,7 @@ #include #include #include +#include #include "rpmostreed-transaction.h" #include "rpmostreed-errors.h" @@ -47,6 +48,8 @@ struct _RpmostreedTransactionPrivate { char *agent_id; char *sd_unit; + std::optional> tokio_handle; + gint64 last_progress_journal; gboolean redirect_output; @@ -341,6 +344,8 @@ transaction_execute_thread (GTask *task, * anyways. */ g_main_context_push_thread_default (mctx); + // Further, we join the main Tokio async runtime. + auto guard = (*priv->tokio_handle)->enter(); if (clazz->execute != NULL) { @@ -512,6 +517,8 @@ transaction_finalize (GObject *object) if (priv->watch_id > 0) g_bus_unwatch_name (priv->watch_id); + priv->tokio_handle.~optional(); + g_hash_table_destroy (priv->peer_connections); g_free (priv->client_description); @@ -823,6 +830,8 @@ rpmostreed_transaction_init (RpmostreedTransaction *self) g_direct_equal, g_object_unref, NULL); + + self->priv->tokio_handle = rpmostreecxx::tokio_handle_get(); } gboolean