Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
handle exit and avoid threads hanging (#137)
Browse files Browse the repository at this point in the history
* barrier on starting network

* handle exit better

* give consensus service its own internal exit signal

* update comment

* remove stop_notifications and fix build
  • Loading branch information
rphmeier authored and arkpar committed Apr 18, 2018
1 parent 98d2777 commit 763787c
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 62 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
T: Into<std::ffi::OsString> + Clone,
{
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
let exit = {
// can't use signal directly here because CtrlC takes only `Fn`.
let (exit_send, exit) = mpsc::channel(1);
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
});

exit
};

let yaml = load_yaml!("./cli.yml");
let matches = match clap::App::from_yaml(yaml).version(crate_version!()).get_matches_from_safe(args) {
Expand Down Expand Up @@ -140,11 +149,6 @@ pub fn run<I, T>(args: I) -> error::Result<()> where

informant::start(&service, core.handle());

let (exit_send, exit) = mpsc::channel(1);
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
});

let _rpc_servers = {
let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches)?;
let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?;
Expand Down
1 change: 1 addition & 0 deletions polkadot/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ tokio-core = "0.1.12"
ed25519 = { path = "../../substrate/ed25519" }
error-chain = "0.11"
log = "0.3"
exit-future = "0.1"
polkadot-api = { path = "../api" }
polkadot-collator = { path = "../collator" }
polkadot-primitives = { path = "../primitives" }
Expand Down
1 change: 1 addition & 0 deletions polkadot/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ extern crate substrate_primitives as primitives;
extern crate substrate_runtime_support as runtime_support;
extern crate substrate_network;

extern crate exit_future;
extern crate tokio_core;
extern crate substrate_keyring;
extern crate substrate_client as client;
Expand Down
91 changes: 58 additions & 33 deletions polkadot/consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,6 @@ impl<E> Sink for BftSink<E> {
}
}

/// Consensus service. Starts working when created.
pub struct Service {
thread: Option<thread::JoinHandle<()>>,
}

struct Network(Arc<net::ConsensusService>);

fn start_bft<F, C>(
Expand Down Expand Up @@ -259,16 +254,24 @@ fn start_bft<F, C>(
}
}

/// Consensus service. Starts working when created.
pub struct Service {
thread: Option<thread::JoinHandle<()>>,
exit_signal: Option<::exit_future::Signal>,
}

impl Service {
/// Create and start a new instance.
pub fn new<C>(
client: Arc<C>,
network: Arc<net::ConsensusService>,
transaction_pool: Arc<Mutex<TransactionPool>>,
key: ed25519::Pair
key: ed25519::Pair,
) -> Service
where C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
where
C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static,
{
let (signal, exit) = ::exit_future::signal();
let thread = thread::spawn(move || {
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
let key = Arc::new(key);
Expand All @@ -281,52 +284,74 @@ impl Service {
let messages = SharedMessageCollection::new();
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));

let handle = core.handle();
let notifications = client.import_notification_stream().for_each(|notification| {
if notification.is_new_best {
start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
}
Ok(())
});
let notifications = {
let handle = core.handle();
let network = network.clone();
let client = client.clone();
let bft_service = bft_service.clone();
let messages = messages.clone();

let interval = reactor::Interval::new_at(Instant::now() + Duration::from_millis(TIMER_DELAY_MS), Duration::from_millis(TIMER_INTERVAL_MS), &handle).unwrap();
client.import_notification_stream().for_each(move |notification| {
if notification.is_new_best {
start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
}
Ok(())
})
};

let interval = reactor::Interval::new_at(
Instant::now() + Duration::from_millis(TIMER_DELAY_MS),
Duration::from_millis(TIMER_INTERVAL_MS),
&core.handle(),
).expect("it is always possible to create an interval with valid params");
let mut prev_best = match client.best_block_header() {
Ok(header) => header.blake2_256(),
Err(e) => {
warn!("Cant's start consensus service. Error reading best block header: {:?}", e);
return;
}
};
let c = client.clone();
let s = bft_service.clone();
let n = network.clone();
let m = messages.clone();
let handle = core.handle();
let timed = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() {
let hash = best_block.blake2_256();
m.collect_garbage();
if hash == prev_best {
debug!("Starting consensus round after a timeout");
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());

let timed = {
let c = client.clone();
let s = bft_service.clone();
let n = network.clone();
let m = messages.clone();
let handle = core.handle();

interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() {
let hash = best_block.blake2_256();
m.collect_garbage();
if hash == prev_best {
debug!("Starting consensus round after a timeout");
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
}
prev_best = hash;
}
prev_best = hash;
}
Ok(())
});
Ok(())
})
};

core.handle().spawn(notifications);
core.handle().spawn(timed);
if let Err(e) = core.run(notifications) {
if let Err(e) = core.run(exit) {
debug!("BFT event loop error {:?}", e);
}
});
Service {
thread: Some(thread)
thread: Some(thread),
exit_signal: Some(signal),
}
}
}

impl Drop for Service {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
signal.fire();
}

if let Some(thread) = self.thread.take() {
thread.join().expect("The service thread has panicked");
}
Expand Down
1 change: 1 addition & 0 deletions polkadot/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ tokio-timer = "0.1.2"
error-chain = "0.11"
log = "0.3"
tokio-core = "0.1.12"
exit-future = "0.1"
ed25519 = { path = "../../substrate/ed25519" }
polkadot-primitives = { path = "../primitives" }
polkadot-runtime = { path = "../runtime" }
Expand Down
65 changes: 46 additions & 19 deletions polkadot/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ extern crate polkadot_api;
extern crate polkadot_consensus as consensus;
extern crate polkadot_transaction_pool as transaction_pool;
extern crate polkadot_keystore as keystore;
extern crate substrate_client as client;
extern crate substrate_runtime_io as runtime_io;
extern crate substrate_primitives as primitives;
extern crate substrate_network as network;
extern crate substrate_codec as codec;
extern crate substrate_executor;

extern crate exit_future;
extern crate tokio_core;
extern crate substrate_client as client;

#[macro_use]
extern crate error_chain;
Expand Down Expand Up @@ -65,6 +66,7 @@ use polkadot_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyC
use client::{genesis, BlockchainEvents};
use client::in_mem::Backend as InMemory;
use network::ManageNetwork;
use exit_future::Signal;

pub use self::error::{ErrorKind, Error};
pub use config::{Configuration, Role, ChainSpec};
Expand All @@ -77,6 +79,7 @@ pub struct Service {
client: Arc<Client>,
network: Arc<network::Service>,
transaction_pool: Arc<Mutex<TransactionPool>>,
signal: Option<Signal>,
_consensus: Option<consensus::Service>,
}

Expand Down Expand Up @@ -242,6 +245,10 @@ fn local_testnet_config() -> ChainConfig {
impl Service {
/// Creates and register protocol with the network service
pub fn new(mut config: Configuration) -> Result<Service, error::Error> {
use std::sync::Barrier;

let (signal, exit) = ::exit_future::signal();

// Create client
let executor = polkadot_executor::Executor::new();
let mut storage = Default::default();
Expand Down Expand Up @@ -284,7 +291,39 @@ impl Service {
chain: client.clone(),
transaction_pool: transaction_pool_adapter,
};

let network = network::Service::new(network_params)?;
let barrier = ::std::sync::Arc::new(Barrier::new(2));

let thread = {
let client = client.clone();
let network = network.clone();
let txpool = transaction_pool.clone();

let thread_barrier = barrier.clone();
thread::spawn(move || {
network.start_network();

thread_barrier.wait();
let mut core = Core::new().expect("tokio::Core could not be created");
let events = client.import_notification_stream().for_each(move |notification| {
network.on_block_imported(notification.hash, &notification.header);
prune_imported(&*client, &*txpool, notification.hash);

Ok(())
});

core.handle().spawn(events);
if let Err(e) = core.run(exit) {
debug!("Polkadot service event loop shutdown with {:?}", e);
}
debug!("Polkadot service shutdown");
})
};

// wait for the network to start up before starting the consensus
// service.
barrier.wait();

// Spin consensus service if configured
let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR {
Expand All @@ -296,28 +335,12 @@ impl Service {
None
};

let thread_client = client.clone();
let thread_network = network.clone();
let thread_txpool = transaction_pool.clone();
let thread = thread::spawn(move || {
thread_network.start_network();
let mut core = Core::new().expect("tokio::Core could not be created");
let events = thread_client.import_notification_stream().for_each(|notification| {
thread_network.on_block_imported(notification.hash, &notification.header);
prune_imported(&*thread_client, &*thread_txpool, notification.hash);

Ok(())
});
if let Err(e) = core.run(events) {
debug!("Polkadot service event loop shutdown with {:?}", e);
}
debug!("Polkadot service shutdown");
});
Ok(Service {
thread: Some(thread),
client: client,
network: network,
transaction_pool: transaction_pool,
signal: Some(signal),
_consensus: consensus_service,
})
}
Expand Down Expand Up @@ -357,8 +380,12 @@ pub fn prune_imported(client: &Client, pool: &Mutex<TransactionPool>, hash: Head

impl Drop for Service {
fn drop(&mut self) {
self.client.stop_notifications();
self.network.stop_network();

if let Some(signal) = self.signal.take() {
signal.fire();
}

if let Some(thread) = self.thread.take() {
thread.join().expect("The service thread has panicked");
}
Expand Down
5 changes: 0 additions & 5 deletions substrate/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,6 @@ impl<B, E> Client<B, E> where
self.executor.clone()
}

/// Close notification streams.
pub fn stop_notifications(&self) {
self.import_notification_sinks.lock().clear();
}

/// Get the current set of authorities from storage.
pub fn authorities_at(&self, id: &BlockId) -> error::Result<Vec<AuthorityId>> {
let state = self.state_at(id)?;
Expand Down

0 comments on commit 763787c

Please sign in to comment.