From 6ee1244e2d018333746d82131222308e0d802e6a Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 21 Jan 2020 13:06:15 +0100 Subject: [PATCH] Pass an executor through the Configuration (#4688) * Pass an executor through the Configuration * Make tasks_executor mandatory * Fix tests --- bin/node-template/src/cli.rs | 6 +++++- bin/node/cli/src/cli.rs | 6 +++++- client/service/src/builder.rs | 6 +++++- client/service/src/config.rs | 6 ++++-- client/service/src/error.rs | 3 +++ client/service/src/lib.rs | 25 ++++--------------------- client/service/test/src/lib.rs | 21 +++++++++++++++++++-- utils/browser/src/lib.rs | 3 +++ 8 files changed, 48 insertions(+), 28 deletions(-) diff --git a/bin/node-template/src/cli.rs b/bin/node-template/src/cli.rs index 44764e5c9db41..fcfd330816cd1 100644 --- a/bin/node-template/src/cli.rs +++ b/bin/node-template/src/cli.rs @@ -18,7 +18,7 @@ pub fn run(args: I, exit: E, version: VersionInfo) -> error::Result<()> type Config = Configuration<(), T>; match parse_and_prepare::(&version, "substrate-node", args) { ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit, - |exit, _cli_args, _custom_args, config: Config<_>| { + |exit, _cli_args, _custom_args, mut config: Config<_>| { info!("{}", version.name); info!(" version {}", config.full_version()); info!(" by {}, 2017, 2018", version.author); @@ -26,6 +26,10 @@ pub fn run(args: I, exit: E, version: VersionInfo) -> error::Result<()> info!("Node name: {}", config.name); info!("Roles: {}", display_role(&config)); let runtime = Runtime::new().map_err(|e| format!("{:?}", e))?; + config.tasks_executor = { + let runtime_handle = runtime.handle().clone(); + Some(Box::new(move |fut| { runtime_handle.spawn(fut); })) + }; match config.roles { ServiceRoles::LIGHT => run_until_exit( runtime, diff --git a/bin/node/cli/src/cli.rs b/bin/node/cli/src/cli.rs index 6a2e02efb6209..5ade700513e53 100644 --- a/bin/node/cli/src/cli.rs +++ b/bin/node/cli/src/cli.rs @@ -98,7 +98,7 @@ pub fn run(args: I, exit: E, version: sc_cli::VersionInfo) -> error::Re match parse_and_prepare::(&version, "substrate-node", args) { ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit, - |exit, _cli_args, _custom_args, config: Config<_, _>| { + |exit, _cli_args, _custom_args, mut config: Config<_, _>| { info!("{}", version.name); info!(" version {}", config.full_version()); info!(" by Parity Technologies, 2017-2019"); @@ -111,6 +111,10 @@ pub fn run(args: I, exit: E, version: sc_cli::VersionInfo) -> error::Re .enable_all() .build() .map_err(|e| format!("{:?}", e))?; + config.tasks_executor = { + let runtime_handle = runtime.handle().clone(); + Some(Box::new(move |fut| { runtime_handle.spawn(fut); })) + }; match config.roles { ServiceRoles::LIGHT => run_until_exit( runtime, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 5449b00b061a3..044798701c6e1 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -1147,7 +1147,11 @@ ServiceBuilder< essential_failed_rx, to_spawn_tx, to_spawn_rx, - to_poll: Vec::new(), + tasks_executor: if let Some(exec) = config.tasks_executor { + exec + } else { + return Err(Error::TasksExecutorRequired); + }, rpc_handlers, _rpc: rpc, _telemetry: telemetry, diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 75c626821479d..8a145dec165b4 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -21,7 +21,7 @@ pub use sc_client_db::{kvdb::KeyValueDB, PruningMode}; pub use sc_network::config::{ExtTransport, NetworkConfiguration, Roles}; pub use sc_executor::WasmExecutionMethod; -use std::{path::{PathBuf, Path}, net::SocketAddr, sync::Arc}; +use std::{future::Future, path::{PathBuf, Path}, pin::Pin, net::SocketAddr, sync::Arc}; pub use sc_transaction_pool::txpool::Options as TransactionPoolOptions; use sc_chain_spec::{ChainSpec, RuntimeGenesis, Extension, NoExtension}; use sp_core::crypto::Protected; @@ -29,7 +29,6 @@ use target_info::Target; use sc_telemetry::TelemetryEndpoints; /// Service configuration. -#[derive(Clone)] pub struct Configuration { /// Implementation name pub impl_name: &'static str, @@ -39,6 +38,8 @@ pub struct Configuration { pub impl_commit: &'static str, /// Node roles. pub roles: Roles, + /// How to spawn background tasks. Mandatory, otherwise creating a `Service` will error. + pub tasks_executor: Option + Send>>) + Send>>, /// Extrinsic pool configuration. pub transaction_pool: TransactionPoolOptions, /// Network configuration. @@ -160,6 +161,7 @@ impl Configuration where config_dir: config_dir.clone(), name: Default::default(), roles: Roles::FULL, + tasks_executor: None, transaction_pool: Default::default(), network: Default::default(), keystore: KeystoreConfig::None, diff --git a/client/service/src/error.rs b/client/service/src/error.rs index 6516b1c62c6d6..14b03d7e95de7 100644 --- a/client/service/src/error.rs +++ b/client/service/src/error.rs @@ -40,6 +40,9 @@ pub enum Error { /// Best chain selection strategy is missing. #[display(fmt="Best chain selection strategy (SelectChain) is not provided.")] SelectChainRequired, + /// Tasks executor is missing. + #[display(fmt="Tasks executor hasn't been provided.")] + TasksExecutorRequired, /// Other error. Other(String), } diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 87327d0967583..c1b87e4491904 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -38,7 +38,7 @@ use parking_lot::Mutex; use sc_client::Client; use exit_future::Signal; use futures::{ - Future, FutureExt, Stream, StreamExt, TryFutureExt, + Future, FutureExt, Stream, StreamExt, future::select, channel::mpsc, compat::*, sink::SinkExt, @@ -95,10 +95,8 @@ pub struct Service { to_spawn_tx: mpsc::UnboundedSender + Send>>>, /// Receiver for futures that must be spawned as background tasks. to_spawn_rx: mpsc::UnboundedReceiver + Send>>>, - /// List of futures to poll from `poll`. - /// If spawning a background task is not possible, we instead push the task into this `Vec`. - /// The elements must then be polled manually. - to_poll: Vec + Send>>>, + /// How to spawn background tasks. + tasks_executor: Box + Send>>) + Send>, rpc_handlers: sc_rpc_server::RpcHandler, _rpc: Box, _telemetry: Option, @@ -322,22 +320,7 @@ impl Future for } while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) { - // TODO: Update to tokio 0.2 when libp2p get switched to std futures (#4383) - let executor = tokio_executor::DefaultExecutor::current(); - use futures01::future::Executor; - if let Err(err) = executor.execute(task_to_spawn.unit_error().compat()) { - debug!( - target: "service", - "Failed to spawn background task: {:?}; falling back to manual polling", - err - ); - this.to_poll.push(Box::pin(err.into_future().compat().map(drop))); - } - } - - // Polling all the `to_poll` futures. - while let Some(pos) = this.to_poll.iter_mut().position(|t| Pin::new(t).poll(cx).is_ready()) { - let _ = this.to_poll.remove(pos); + (this.tasks_executor)(task_to_spawn); } // The service future never ends. diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index dd6395e9c6228..961c8d98ffd09 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -19,9 +19,11 @@ use std::iter; use std::sync::{Arc, Mutex, MutexGuard}; use std::net::Ipv4Addr; +use std::pin::Pin; use std::time::Duration; use log::info; use futures01::{Future, Stream, Poll}; +use futures::{FutureExt as _, TryFutureExt as _}; use tempfile::TempDir; use tokio::{runtime::Runtime, prelude::FutureExt}; use tokio::timer::Interval; @@ -131,6 +133,7 @@ fn node_config ( index: usize, spec: &ChainSpec, role: Roles, + tasks_executor: Box + Send>>) + Send>, key_seed: Option, base_port: u16, root: &TempDir, @@ -172,6 +175,7 @@ fn node_config ( impl_version: "0.1", impl_commit: "", roles: role, + tasks_executor: Some(tasks_executor), transaction_pool: Default::default(), network: network_config, keystore: KeystoreConfig::Path { @@ -251,10 +255,15 @@ impl TestNet where let executor = self.runtime.executor(); for (key, authority) in authorities { + let tasks_executor = { + let executor = executor.clone(); + Box::new(move |fut: Pin + Send>>| executor.spawn(fut.unit_error().compat())) + }; let node_config = node_config( self.nodes, &self.chain_spec, Roles::AUTHORITY, + tasks_executor, Some(key), self.base_port, &temp, @@ -270,7 +279,11 @@ impl TestNet where } for full in full { - let node_config = node_config(self.nodes, &self.chain_spec, Roles::FULL, None, self.base_port, &temp); + let tasks_executor = { + let executor = executor.clone(); + Box::new(move |fut: Pin + Send>>| executor.spawn(fut.unit_error().compat())) + }; + let node_config = node_config(self.nodes, &self.chain_spec, Roles::FULL, tasks_executor, None, self.base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); let (service, user_data) = full(node_config).expect("Error creating test node service"); let service = SyncService::from(service); @@ -282,7 +295,11 @@ impl TestNet where } for light in light { - let node_config = node_config(self.nodes, &self.chain_spec, Roles::LIGHT, None, self.base_port, &temp); + let tasks_executor = { + let executor = executor.clone(); + Box::new(move |fut: Pin + Send>>| executor.spawn(fut.unit_error().compat())) + }; + let node_config = node_config(self.nodes, &self.chain_spec, Roles::LIGHT, tasks_executor, None, self.base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); let service = SyncService::from(light(node_config).expect("Error creating test node service")); diff --git a/utils/browser/src/lib.rs b/utils/browser/src/lib.rs index 0dbde57182766..7b7fda45839ee 100644 --- a/utils/browser/src/lib.rs +++ b/utils/browser/src/lib.rs @@ -52,6 +52,9 @@ where allow_private_ipv4: true, enable_mdns: false, }; + config.tasks_executor = Some(Box::new(move |fut| { + wasm_bindgen_futures::spawn_local(fut) + })); config.telemetry_external_transport = Some(transport); config.roles = Roles::LIGHT; config.name = format!("{} (Browser)", name);