Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
use ThreadPool to execute spawn_worker(fn) (#3836)
Browse files Browse the repository at this point in the history
* use ThreadPool to spawn_worker()

* use ThreadPool to implement spawn_worker(fn)

* use ThreadPool to implement spawn_worker(f)

* update [dependencies] threadpool and num_cpus version

*  rm 'extern crate num_cpus'

* cargo.lock update

*  merge the newest cargo.lock

* Update Cargo.lock

* use Mutex to wrap OffchainWorkers.thread_pool

* format use crate

* use parking_lot::Mutex instead of std::sync::Mutex
  • Loading branch information
Lawliet-Chan authored and tomusdrw committed Oct 21, 2019
1 parent c497fcb commit b7627c4
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/offchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ futures-timer = "0.4.0"
hyper = "0.12.35"
hyper-tls = "0.3.2"
log = "0.4.8"
threadpool = "1.7"
num_cpus = "1.10"
offchain-primitives = { package = "substrate-offchain-primitives", path = "./primitives" }
codec = { package = "parity-scale-codec", version = "1.0.0", features = ["derive"] }
parking_lot = "0.9.0"
Expand Down
29 changes: 16 additions & 13 deletions core/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use std::{
sync::Arc,
};

use parking_lot::Mutex;
use threadpool::ThreadPool;
use client::runtime_api::ApiExt;
use futures::future::Future;
use log::{debug, warn};
Expand All @@ -58,6 +60,7 @@ pub struct OffchainWorkers<Client, Storage, Block: traits::Block> {
client: Arc<Client>,
db: Storage,
_block: PhantomData<Block>,
thread_pool: Mutex<ThreadPool>,
}

impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Block> {
Expand All @@ -67,6 +70,7 @@ impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Blo
client,
db,
_block: PhantomData,
thread_pool: Mutex::new(ThreadPool::new(num_cpus::get())),
}
}
}
Expand Down Expand Up @@ -116,7 +120,7 @@ impl<Client, Storage, Block> OffchainWorkers<
debug!("Spawning offchain workers at {:?}", at);
let number = *number;
let client = self.client.clone();
spawn_worker(move || {
self.spawn_worker(move || {
let runtime = client.runtime_api();
let api = Box::new(api);
debug!("Running offchain workers at {:?}", at);
Expand All @@ -134,19 +138,18 @@ impl<Client, Storage, Block> OffchainWorkers<
futures::future::Either::Right(futures::future::ready(()))
}
}
}

/// Spawns a new offchain worker.
///
/// We spawn offchain workers for each block in a separate thread,
/// since they can run for a significant amount of time
/// in a blocking fashion and we don't want to block the runtime.
///
/// Note that we should avoid that if we switch to future-based runtime in the future,
/// alternatively:
/// TODO [ToDr] (#1458) we can consider using a thread pool instead.
fn spawn_worker(f: impl FnOnce() -> () + Send + 'static) {
std::thread::spawn(f);
/// Spawns a new offchain worker.
///
/// We spawn offchain workers for each block in a separate thread,
/// since they can run for a significant amount of time
/// in a blocking fashion and we don't want to block the runtime.
///
/// Note that we should avoid that if we switch to future-based runtime in the future,
/// alternatively:
fn spawn_worker(&self, f: impl FnOnce() -> () + Send + 'static) {
self.thread_pool.lock().execute(f);
}
}

#[cfg(test)]
Expand Down

0 comments on commit b7627c4

Please sign in to comment.