Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Pass an executor through the Configuration #4688

Merged
merged 3 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bin/node-template/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ pub fn run<I, T, E>(args: I, exit: E, version: VersionInfo) -> error::Result<()>
type Config<T> = Configuration<(), T>;
match parse_and_prepare::<NoCustom, NoCustom, _>(&version, "substrate-node", args) {
ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit,
|exit, _cli_args, _custom_args, config: Config<_>| {
|exit, _cli_args, _custom_args, mut config: Config<_>| {
info!("{}", version.name);
info!(" version {}", config.full_version());
info!(" by {}, 2017, 2018", version.author);
info!("Chain specification: {}", config.chain_spec.name());
info!("Node name: {}", config.name);
info!("Roles: {}", display_role(&config));
let runtime = Runtime::new().map_err(|e| format!("{:?}", e))?;
config.tasks_executor = {
let runtime_handle = runtime.handle().clone();
Some(Box::new(move |fut| { runtime_handle.spawn(fut); }))
};
match config.roles {
ServiceRoles::LIGHT => run_until_exit(
runtime,
Expand Down
6 changes: 5 additions & 1 deletion bin/node/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub fn run<I, T, E>(args: I, exit: E, version: sc_cli::VersionInfo) -> error::Re

match parse_and_prepare::<CustomSubcommands, NoCustom, _>(&version, "substrate-node", args) {
ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit,
|exit, _cli_args, _custom_args, config: Config<_, _>| {
|exit, _cli_args, _custom_args, mut config: Config<_, _>| {
info!("{}", version.name);
info!(" version {}", config.full_version());
info!(" by Parity Technologies, 2017-2019");
Expand All @@ -110,6 +110,10 @@ pub fn run<I, T, E>(args: I, exit: E, version: sc_cli::VersionInfo) -> error::Re
.threaded_scheduler()
.build()
.map_err(|e| format!("{:?}", e))?;
config.tasks_executor = {
let runtime_handle = runtime.handle().clone();
Some(Box::new(move |fut| { runtime_handle.spawn(fut); }))
};
match config.roles {
ServiceRoles::LIGHT => run_until_exit(
runtime,
Expand Down
10 changes: 10 additions & 0 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use sp_consensus::import_queue::ImportQueue;
use futures::{
Future, FutureExt, StreamExt,
channel::mpsc,
executor::ThreadPoolBuilder,
future::{select, ready}
};
use sc_keystore::{Store as Keystore};
Expand Down Expand Up @@ -1147,6 +1148,15 @@ ServiceBuilder<
essential_failed_rx,
to_spawn_tx,
to_spawn_rx,
tasks_executor: if let Some(exec) = config.tasks_executor {
Some(exec)
} else {
ThreadPoolBuilder::new()
.name_prefix("main-task-")
.create()
.ok()
.map(|tp| Box::new(move |fut| tp.spawn_ok(fut)) as Box<_>)
},
to_poll: Vec::new(),
rpc_handlers,
_rpc: rpc,
Expand Down
7 changes: 5 additions & 2 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ pub use sc_client_db::{kvdb::KeyValueDB, PruningMode};
pub use sc_network::config::{ExtTransport, NetworkConfiguration, Roles};
pub use sc_executor::WasmExecutionMethod;

use std::{path::{PathBuf, Path}, net::SocketAddr, sync::Arc};
use std::{future::Future, path::{PathBuf, Path}, pin::Pin, net::SocketAddr, sync::Arc};
pub use sc_transaction_pool::txpool::Options as TransactionPoolOptions;
use sc_chain_spec::{ChainSpec, RuntimeGenesis, Extension, NoExtension};
use sp_core::crypto::Protected;
use target_info::Target;
use sc_telemetry::TelemetryEndpoints;

/// Service configuration.
#[derive(Clone)]
pub struct Configuration<C, G, E = NoExtension> {
/// Implementation name
pub impl_name: &'static str,
Expand All @@ -39,6 +38,9 @@ pub struct Configuration<C, G, E = NoExtension> {
pub impl_commit: &'static str,
/// Node roles.
pub roles: Roles,
/// How to spawn background tasks.
/// If `None`, will try to use a threads pool.
pub tasks_executor: Option<Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>>,
/// Extrinsic pool configuration.
pub transaction_pool: TransactionPoolOptions,
/// Network configuration.
Expand Down Expand Up @@ -160,6 +162,7 @@ impl<C, G, E> Configuration<C, G, E> where
config_dir: config_dir.clone(),
name: Default::default(),
roles: Roles::FULL,
tasks_executor: None,
transaction_pool: Default::default(),
network: Default::default(),
keystore: KeystoreConfig::None,
Expand Down
16 changes: 6 additions & 10 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
to_spawn_tx: mpsc::UnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// Receiver for futures that must be spawned as background tasks.
to_spawn_rx: mpsc::UnboundedReceiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// How to spawn background tasks.
tasks_executor: Option<Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>>,
/// List of futures to poll from `poll`.
/// If spawning a background task is not possible, we instead push the task into this `Vec`.
/// The elements must then be polled manually.
Expand Down Expand Up @@ -322,16 +324,10 @@ impl<TBl: Unpin, TCl, TSc: Unpin, TNetStatus, TNet, TTxPool, TOc> Future for
}

while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) {
// TODO: Update to tokio 0.2 when libp2p get switched to std futures (#4383)
let executor = tokio_executor::DefaultExecutor::current();
use futures01::future::Executor;
if let Err(err) = executor.execute(task_to_spawn.unit_error().compat()) {
debug!(
target: "service",
"Failed to spawn background task: {:?}; falling back to manual polling",
err
);
this.to_poll.push(Box::pin(err.into_future().compat().map(drop)));
if let Some(tasks_executor) = this.tasks_executor.as_ref() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this logic? The builder creates a thread builder anyway and browser gives us a spawner as well. This sounds like we can remove the option here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user leaves None in the Configuration, we try to create a ThreadPool and this is the path that is reached if spawning the threads failed.

In practice, without this code we will get a panic in the browser if the user forgets to tweak the configuration. I guess it's indeed debatable whether it's worth the additional code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should go the way of checking at Service creation that a task_executor is provided and otherwise an error is returned. I know that is not the best solution, but IMHO the best we can do for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with @bkchr.

tasks_executor(task_to_spawn);
} else {
this.to_poll.push(Box::pin(task_to_spawn.map(drop)));
}
}

Expand Down
1 change: 1 addition & 0 deletions client/service/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ fn node_config<G, E: Clone> (
impl_version: "0.1",
impl_commit: "",
roles: role,
tasks_executor: None,
transaction_pool: Default::default(),
network: network_config,
keystore: KeystoreConfig::Path {
Expand Down
3 changes: 3 additions & 0 deletions utils/browser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ where
allow_private_ipv4: true,
enable_mdns: false,
};
config.tasks_executor = Some(Box::new(move |fut| {
wasm_bindgen_futures::spawn_local(fut)
}));
config.telemetry_external_transport = Some(transport);
config.roles = Roles::LIGHT;
config.name = format!("{} (Browser)", name);
Expand Down