Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Fix so FilterManager drops its resources
Browse files Browse the repository at this point in the history
* Introduced an AtomicBool flag in FilterManager to cancel the `Decryption Worker Thread`
* Added some very basic test to faulty arguments
  • Loading branch information
niklasad1 committed Apr 5, 2018
1 parent e02b8c1 commit a76eb9b
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 14 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion whisper/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ panic_hook = { path = "../../util/panic_hook" }
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-http-server = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
log = "0.4"
log = "0.3"

[[bin]]
name = "whisper-cli"
Expand Down
60 changes: 56 additions & 4 deletions whisper/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ Options:
--whisper-pool-size SIZE Specify Whisper pool size [default: 10].
-p, --port PORT Specify which RPC port to use [default: 8545].
-a, --address ADDRESS Specify which address to use [default: 127.0.0.1].
-l, --log LEVEL Specify log level to use [default: Error].
-l, --log LEVEL Specify the logging level. Must conform to the same format as RUST_LOG [default: Error].
-h, --help Display this message and exit.
"#;

#[derive(Clone, Default)]
Expand Down Expand Up @@ -129,6 +128,7 @@ enum Error {
JsonRpc(jsonrpc_core::Error),
Network(net::Error),
SockAddr(std::net::AddrParseError),
Logger(String),
}

impl From<std::net::AddrParseError> for Error {
Expand Down Expand Up @@ -161,6 +161,12 @@ impl From<jsonrpc_core::Error> for Error {
}
}

impl From<String> for Error {
fn from(err: String) -> Self {
Error::Logger(err)
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
Expand All @@ -169,6 +175,7 @@ impl fmt::Display for Error {
Error::Io(ref e) => write!(f, "IoError: {}", e),
Error::JsonRpc(ref e) => write!(f, "JsonRpcError: {:?}", e),
Error::Network(ref e) => write!(f, "NetworkError: {}", e),
Error::Logger(ref e) => write!(f, "LoggerError: {}", e),
}
}
}
Expand All @@ -195,7 +202,7 @@ fn execute<S, I>(command: I) -> Result<(), Error> where I: IntoIterator<Item=S>,
let pool_size = args.flag_whisper_pool_size * POOL_UNIT;
let url = format!("{}:{}", args.flag_address, args.flag_port);

let _ = set_logger(args.flag_log);
initialize_logger(args.flag_log)?;
info!(target: "whisper-cli", "start");

// Filter manager that will dispatch `decryption tasks`
Expand Down Expand Up @@ -238,10 +245,55 @@ fn execute<S, I>(command: I) -> Result<(), Error> where I: IntoIterator<Item=S>,
Ok(())
}

fn set_logger(log_level: String) -> Result<(), String> {
fn initialize_logger(log_level: String) -> Result<(), String> {
let mut l = log::Config::default();
l.mode = Some(log_level);
log::setup_log(&l)?;
Ok(())
}


#[cfg(test)]
mod tests {
use super::execute;

#[test]
fn invalid_argument() {
let command = vec!["whisper-cli", "--foo=12"]
.into_iter()
.map(Into::into)
.collect::<Vec<String>>();

assert!(execute(command).is_err());
}

#[test]
fn privileged_port() {
let command = vec!["whisper-cli", "--port=3"]
.into_iter()
.map(Into::into)
.collect::<Vec<String>>();

assert!(execute(command).is_err());
}

#[test]
fn invalid_ip_address() {
let command = vec!["whisper-cli", "--address=x.x.x.x"]
.into_iter()
.map(Into::into)
.collect::<Vec<String>>();

assert!(execute(command).is_err());
}

#[test]
fn invalid_whisper_pool_size() {
let command = vec!["whisper-cli", "--whisper-pool-size=-100000000000000000000000000000000000000"]
.into_iter()
.map(Into::into)
.collect::<Vec<String>>();

assert!(execute(command).is_err());
}
}
28 changes: 22 additions & 6 deletions whisper/src/rpc/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
//! Abstraction over filters which works with polling and subscription.
use std::collections::HashMap;
use std::sync::{mpsc, Arc};
use std::thread;
use std::{sync::{Arc, atomic, atomic::AtomicBool, mpsc}, thread};

use ethereum_types::{H256, H512};
use ethkey::Public;
Expand All @@ -27,8 +26,7 @@ use parking_lot::{Mutex, RwLock};
use rand::{Rng, OsRng};

use message::{Message, Topic};
use super::key_store::KeyStore;
use super::types::{self, FilterItem, HexEncode};
use super::{key_store::KeyStore, types::{self, FilterItem, HexEncode}};

/// Kinds of filters,
#[derive(PartialEq, Eq, Clone, Copy)]
Expand All @@ -53,22 +51,37 @@ pub struct Manager {
filters: RwLock<HashMap<H256, FilterEntry>>,
tx: Mutex<mpsc::Sender<Box<Fn() + Send>>>,
join: Option<thread::JoinHandle<()>>,
exit: Arc<AtomicBool>,
}

impl Manager {
/// Create a new filter manager that will dispatch decryption tasks onto
/// the given thread pool.
pub fn new() -> ::std::io::Result<Self> {
let (tx, rx) = mpsc::channel::<Box<Fn() + Send>>();
let exit = Arc::new(AtomicBool::new(false));
let e = exit.clone();

let join_handle = thread::Builder::new()
.name("Whisper Decryption Worker".to_string())
.spawn(move || for item in rx { (item)() })?;
.spawn(move || {
trace!(target: "parity_whisper", "Start decryption worker");
loop {
if exit.load(atomic::Ordering::Acquire) {
break;
}
if let Ok(item) = rx.try_recv() {
item();
}
}
})?;

Ok(Manager {
key_store: Arc::new(RwLock::new(KeyStore::new()?)),
filters: RwLock::new(HashMap::new()),
tx: Mutex::new(tx),
join: Some(join_handle),
exit: e,
})
}

Expand Down Expand Up @@ -103,7 +116,7 @@ impl Manager {
}

/// Insert new subscription filter. Generates a secure ID and sends it to
/// the
/// the subscriber
pub fn insert_subscription(&self, filter: Filter, sub: Subscriber<FilterItem>)
-> Result<(), &'static str>
{
Expand Down Expand Up @@ -180,9 +193,12 @@ impl ::net::MessageHandler for Arc<Manager> {

impl Drop for Manager {
fn drop(&mut self) {
trace!(target: "parity_whisper", "waiting to drop FilterManager");
self.exit.store(true, atomic::Ordering::Release);
if let Some(guard) = self.join.take() {
let _ = guard.join();
}
trace!(target: "parity_whisper", "FilterManager dropped");
}
}

Expand Down

0 comments on commit a76eb9b

Please sign in to comment.