diff --git a/dapps/src/tests/helpers/fetch.rs b/dapps/src/tests/helpers/fetch.rs index dfe523200e4..17b787b2505 100644 --- a/dapps/src/tests/helpers/fetch.rs +++ b/dapps/src/tests/helpers/fetch.rs @@ -34,7 +34,7 @@ impl FetchControl { } pub fn wait_for_requests(&self, len: usize) { - const MAX_TIMEOUT_MS: u64 = 5000; + const MAX_TIMEOUT: time::Duration = time::Duration::from_millis(5000); const ATTEMPTS: u64 = 10; let mut attempts_left = ATTEMPTS; loop { @@ -50,7 +50,7 @@ impl FetchControl { } else { attempts_left -= 1; // Should we handle spurious timeouts better? - thread::park_timeout(time::Duration::from_millis(MAX_TIMEOUT_MS / ATTEMPTS)); + thread::park_timeout(MAX_TIMEOUT / ATTEMPTS); } } } diff --git a/ethcore/light/src/net/load_timer.rs b/ethcore/light/src/net/load_timer.rs index c6e524a4412..592d2b630be 100644 --- a/ethcore/light/src/net/load_timer.rs +++ b/ethcore/light/src/net/load_timer.rs @@ -48,11 +48,11 @@ pub trait SampleStore: Send + Sync { } // get a hardcoded, arbitrarily determined (but intended overestimate) -// of the time in nanoseconds to serve a request of the given kind. +// of the time it takes to serve a request of the given kind. // // TODO: seed this with empirical data. -fn hardcoded_serve_time(kind: Kind) -> u64 { - match kind { +fn hardcoded_serve_time(kind: Kind) -> Duration { + Duration::new(0, match kind { Kind::Headers => 500_000, Kind::HeaderProof => 500_000, Kind::TransactionIndex => 500_000, @@ -63,7 +63,7 @@ fn hardcoded_serve_time(kind: Kind) -> u64 { Kind::Code => 1_500_000, Kind::Execution => 250, // per gas. Kind::Signal => 500_000, - } + }) } /// A no-op store. @@ -114,10 +114,10 @@ impl LoadDistribution { } } - /// Calculate EMA of load in nanoseconds for a specific request kind. + /// Calculate EMA of load for a specific request kind. /// If there is no data for the given request kind, no EMA will be calculated, /// but a hardcoded time will be returned. - pub fn expected_time_ns(&self, kind: Kind) -> u64 { + pub fn expected_time(&self, kind: Kind) -> Duration { let samples = self.samples.read(); samples.get(&kind).and_then(|s| { if s.len() == 0 { return None } @@ -128,7 +128,9 @@ impl LoadDistribution { (alpha * c as f64) + ((1.0 - alpha) * a) }); - Some(ema as u64) + // TODO: use `Duration::from_nanos` once stable (https://github.com/rust-lang/rust/issues/46507) + let ema = ema as u64; + Some(Duration::new(ema / 1_000_000_000, (ema % 1_000_000_000) as u32)) }).unwrap_or_else(move || hardcoded_serve_time(kind)) } @@ -223,12 +225,12 @@ mod tests { #[test] fn hardcoded_before_data() { let dist = LoadDistribution::load(&NullStore); - assert_eq!(dist.expected_time_ns(Kind::Headers), hardcoded_serve_time(Kind::Headers)); + assert_eq!(dist.expected_time(Kind::Headers), hardcoded_serve_time(Kind::Headers)); dist.update(Kind::Headers, Duration::new(0, 100_000), 100); dist.end_period(&NullStore); - assert_eq!(dist.expected_time_ns(Kind::Headers), 1000); + assert_eq!(dist.expected_time(Kind::Headers), Duration::new(0, 1000)); } #[test] @@ -244,7 +246,7 @@ mod tests { sum += x; if i == 0 { continue } - let moving_average = dist.expected_time_ns(Kind::Headers); + let moving_average = dist.expected_time(Kind::Headers); // should be weighted below the maximum entry. let arith_average = (sum as f64 / (i + 1) as f64) as u64; @@ -255,9 +257,9 @@ mod tests { // otherwise, the weight should be below the arithmetic mean because the much // smaller previous values are discounted less. if i == 1 { - assert_eq!(moving_average, arith_average); + assert_eq!(moving_average, Duration::new(0, arith_average)); } else { - assert!(moving_average < arith_average) + assert!(moving_average < Duration::new(0, arith_average)) } } } diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index abdf91ae442..8299773230c 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -61,16 +61,16 @@ pub use self::load_timer::{SampleStore, FileStore}; pub use self::status::{Status, Capabilities, Announcement}; const TIMEOUT: TimerToken = 0; -const TIMEOUT_INTERVAL_MS: u64 = 1000; +const TIMEOUT_INTERVAL: Duration = Duration::from_secs(1); const TICK_TIMEOUT: TimerToken = 1; -const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000; +const TICK_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5); const PROPAGATE_TIMEOUT: TimerToken = 2; -const PROPAGATE_TIMEOUT_INTERVAL_MS: u64 = 5000; +const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5); const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3; -const RECALCULATE_COSTS_INTERVAL_MS: u64 = 60 * 60 * 1000; +const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60); // minimum interval between updates. const UPDATE_INTERVAL: Duration = Duration::from_millis(5000); @@ -369,9 +369,9 @@ impl LightProtocol { let sample_store = params.sample_store.unwrap_or_else(|| Box::new(NullStore)); let load_distribution = LoadDistribution::load(&*sample_store); let flow_params = FlowParams::from_request_times( - |kind| load_distribution.expected_time_ns(kind), + |kind| load_distribution.expected_time(kind), params.config.load_share, - params.config.max_stored_seconds, + Duration::from_secs(params.config.max_stored_seconds), ); LightProtocol { @@ -766,9 +766,9 @@ impl LightProtocol { self.load_distribution.end_period(&*self.sample_store); let new_params = Arc::new(FlowParams::from_request_times( - |kind| self.load_distribution.expected_time_ns(kind), + |kind| self.load_distribution.expected_time(kind), self.config.load_share, - self.config.max_stored_seconds, + Duration::from_secs(self.config.max_stored_seconds), )); *self.flow_params.write() = new_params.clone(); @@ -1080,13 +1080,13 @@ fn punish(peer: PeerId, io: &IoContext, e: Error) { impl NetworkProtocolHandler for LightProtocol { fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { - io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS) + io.register_timer(TIMEOUT, TIMEOUT_INTERVAL) .expect("Error registering sync timer."); - io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS) + io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL) .expect("Error registering sync timer."); - io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL_MS) + io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL) .expect("Error registering sync timer."); - io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL_MS) + io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL) .expect("Error registering request timer interval token."); } diff --git a/ethcore/light/src/net/request_credits.rs b/ethcore/light/src/net/request_credits.rs index c35a2922279..59ecc366224 100644 --- a/ethcore/light/src/net/request_credits.rs +++ b/ethcore/light/src/net/request_credits.rs @@ -235,23 +235,30 @@ impl FlowParams { /// Create new flow parameters from , /// proportion of total capacity which should be given to a peer, - /// and number of seconds of stored capacity a peer can accumulate. - pub fn from_request_times u64>( - request_time_ns: F, + /// and stored capacity a peer can accumulate. + pub fn from_request_times Duration>( + request_time: F, load_share: f64, - max_stored_seconds: u64 + max_stored: Duration ) -> Self { use request::Kind; let load_share = load_share.abs(); let recharge: u64 = 100_000_000; - let max = recharge.saturating_mul(max_stored_seconds); + let max = { + let sec = max_stored.as_secs().saturating_mul(recharge); + let nanos = (max_stored.subsec_nanos() as u64).saturating_mul(recharge) / 1_000_000_000; + sec + nanos + }; let cost_for_kind = |kind| { // how many requests we can handle per second - let ns = request_time_ns(kind); - let second_duration = 1_000_000_000f64 / ns as f64; + let rq_dur = request_time(kind); + let second_duration = { + let as_ns = rq_dur.as_secs() as f64 * 1_000_000_000f64 + rq_dur.subsec_nanos() as f64; + 1_000_000_000f64 / as_ns + }; // scale by share of the load given to this peer. let serve_per_second = second_duration * load_share; diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index f190d6e6ac3..33725d3ce5e 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::path::Path; +use std::time::Duration; use ansi_term::Colour; use io::{IoContext, TimerToken, IoHandler, IoService, IoError}; @@ -140,13 +141,13 @@ struct ClientIoHandler { const CLIENT_TICK_TIMER: TimerToken = 0; const SNAPSHOT_TICK_TIMER: TimerToken = 1; -const CLIENT_TICK_MS: u64 = 5000; -const SNAPSHOT_TICK_MS: u64 = 10000; +const CLIENT_TICK: Duration = Duration::from_secs(5); +const SNAPSHOT_TICK: Duration = Duration::from_secs(10); impl IoHandler for ClientIoHandler { fn initialize(&self, io: &IoContext) { - io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer"); - io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK_MS).expect("Error registering snapshot timer"); + io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK).expect("Error registering client timer"); + io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK).expect("Error registering snapshot timer"); } fn timeout(&self, _io: &IoContext, timer: TimerToken) { diff --git a/ethcore/src/account_provider/mod.rs b/ethcore/src/account_provider/mod.rs index 74d24e7f50e..c1fadb6e159 100755 --- a/ethcore/src/account_provider/mod.rs +++ b/ethcore/src/account_provider/mod.rs @@ -641,8 +641,8 @@ impl AccountProvider { } /// Unlocks account temporarily with a timeout. - pub fn unlock_account_timed(&self, account: Address, password: String, duration_ms: u32) -> Result<(), Error> { - self.unlock_account(account, password, Unlock::Timed(Instant::now() + Duration::from_millis(duration_ms as u64))) + pub fn unlock_account_timed(&self, account: Address, password: String, duration: Duration) -> Result<(), Error> { + self.unlock_account(account, password, Unlock::Timed(Instant::now() + duration)) } /// Checks if given account is unlocked @@ -837,7 +837,7 @@ impl AccountProvider { #[cfg(test)] mod tests { use super::{AccountProvider, Unlock, DappId}; - use std::time::Instant; + use std::time::{Duration, Instant}; use ethstore::ethkey::{Generator, Random, Address}; use ethstore::{StoreAccountRef, Derivation}; use ethereum_types::H256; @@ -941,8 +941,8 @@ mod tests { let kp = Random.generate().unwrap(); let ap = AccountProvider::transient_provider(); assert!(ap.insert_account(kp.secret().clone(), "test").is_ok()); - assert!(ap.unlock_account_timed(kp.address(), "test1".into(), 60000).is_err()); - assert!(ap.unlock_account_timed(kp.address(), "test".into(), 60000).is_ok()); + assert!(ap.unlock_account_timed(kp.address(), "test1".into(), Duration::from_secs(60)).is_err()); + assert!(ap.unlock_account_timed(kp.address(), "test".into(), Duration::from_secs(60)).is_ok()); assert!(ap.sign(kp.address(), None, Default::default()).is_ok()); ap.unlocked.write().get_mut(&StoreAccountRef::root(kp.address())).unwrap().unlock = Unlock::Timed(Instant::now()); assert!(ap.sign(kp.address(), None, Default::default()).is_err()); diff --git a/ethcore/src/engines/authority_round/mod.rs b/ethcore/src/engines/authority_round/mod.rs index 0acde347e2b..ebc235aa637 100644 --- a/ethcore/src/engines/authority_round/mod.rs +++ b/ethcore/src/engines/authority_round/mod.rs @@ -19,7 +19,7 @@ use std::fmt; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::{Weak, Arc}; -use std::time::{UNIX_EPOCH, Duration}; +use std::time::{UNIX_EPOCH, SystemTime, Duration}; use std::collections::{BTreeMap, HashSet}; use std::iter::FromIterator; @@ -536,6 +536,7 @@ fn verify_timestamp(step: &Step, header_step: usize) -> Result<(), BlockError> { // NOTE This error might be returned only in early stage of verification (Stage 1). // Returning it further won't recover the sync process. trace!(target: "engine", "verify_timestamp: block too early"); + let oob = oob.map(|n| SystemTime::now() + Duration::from_secs(n)); Err(BlockError::TemporarilyInvalid(oob).into()) }, Ok(_) => Ok(()), @@ -694,8 +695,8 @@ const ENGINE_TIMEOUT_TOKEN: TimerToken = 23; impl IoHandler<()> for TransitionHandler { fn initialize(&self, io: &IoContext<()>) { if let Some(engine) = self.engine.upgrade() { - let remaining = engine.step.duration_remaining(); - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, remaining.as_millis()) + let remaining = engine.step.duration_remaining().as_millis(); + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining)) .unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e)) } } @@ -711,7 +712,7 @@ impl IoHandler<()> for TransitionHandler { } let next_run_at = engine.step.duration_remaining().as_millis() >> 2; - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, next_run_at) + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at)) .unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e)) } } diff --git a/ethcore/src/engines/transition.rs b/ethcore/src/engines/transition.rs index dc745b6e398..a0469b62498 100644 --- a/ethcore/src/engines/transition.rs +++ b/ethcore/src/engines/transition.rs @@ -51,8 +51,7 @@ impl TransitionHandler where S: Sync + Send + Clone { pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 23; fn set_timeout(io: &IoContext, timeout: Duration) { - let ms = timeout.as_secs() * 1_000 + timeout.subsec_nanos() as u64 / 1_000_000; - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, ms) + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, timeout) .unwrap_or_else(|e| warn!(target: "engine", "Failed to set consensus step timeout: {}.", e)) } diff --git a/ethcore/src/error.rs b/ethcore/src/error.rs index aae22411682..44a3e2e1ff8 100644 --- a/ethcore/src/error.rs +++ b/ethcore/src/error.rs @@ -17,6 +17,7 @@ //! General error types for use in ethcore. use std::fmt; +use std::time::SystemTime; use kvdb; use ethereum_types::{H256, U256, Address, Bloom}; use util_error::UtilError; @@ -81,9 +82,9 @@ pub enum BlockError { /// Receipts trie root header field is invalid. InvalidReceiptsRoot(Mismatch), /// Timestamp header field is invalid. - InvalidTimestamp(OutOfBounds), + InvalidTimestamp(OutOfBounds), /// Timestamp header field is too far in future. - TemporarilyInvalid(OutOfBounds), + TemporarilyInvalid(OutOfBounds), /// Log bloom header field is invalid. InvalidLogBloom(Mismatch), /// Number field of header is invalid. @@ -125,8 +126,14 @@ impl fmt::Display for BlockError { InvalidSeal => "Block has invalid seal.".into(), InvalidGasLimit(ref oob) => format!("Invalid gas limit: {}", oob), InvalidReceiptsRoot(ref mis) => format!("Invalid receipts trie root in header: {}", mis), - InvalidTimestamp(ref oob) => format!("Invalid timestamp in header: {}", oob), - TemporarilyInvalid(ref oob) => format!("Future timestamp in header: {}", oob), + InvalidTimestamp(ref oob) => { + let oob = oob.map(|st| st.elapsed().unwrap_or_default().as_secs()); + format!("Invalid timestamp in header: {}", oob) + }, + TemporarilyInvalid(ref oob) => { + let oob = oob.map(|st| st.elapsed().unwrap_or_default().as_secs()); + format!("Future timestamp in header: {}", oob) + }, InvalidLogBloom(ref oob) => format!("Invalid log bloom in header: {}", oob), InvalidNumber(ref mis) => format!("Invalid number in header: {}", mis), RidiculousNumber(ref oob) => format!("Implausible block number. {}", oob), diff --git a/ethcore/src/verification/verification.rs b/ethcore/src/verification/verification.rs index e6289c2b0d6..195cae8a526 100644 --- a/ethcore/src/verification/verification.rs +++ b/ethcore/src/verification/verification.rs @@ -22,7 +22,7 @@ //! 3. Final verification against the blockchain done before enactment. use std::collections::HashSet; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use bytes::Bytes; use ethereum_types::H256; @@ -284,11 +284,10 @@ pub fn verify_header_params(header: &Header, engine: &EthEngine, is_full: bool) } if is_full { - const ACCEPTABLE_DRIFT_SECS: u64 = 15; - let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default(); - let max_time = now.as_secs() + ACCEPTABLE_DRIFT_SECS; - let invalid_threshold = max_time + ACCEPTABLE_DRIFT_SECS * 9; - let timestamp = header.timestamp(); + const ACCEPTABLE_DRIFT: Duration = Duration::from_secs(15); + let max_time = SystemTime::now() + ACCEPTABLE_DRIFT; + let invalid_threshold = max_time + ACCEPTABLE_DRIFT * 9; + let timestamp = UNIX_EPOCH + Duration::from_secs(header.timestamp()); if timestamp > invalid_threshold { return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: Some(max_time), min: None, found: timestamp }))) @@ -310,7 +309,9 @@ fn verify_parent(header: &Header, parent: &Header, engine: &EthEngine) -> Result let gas_limit_divisor = engine.params().gas_limit_bound_divisor; if !engine.is_timestamp_valid(header.timestamp(), parent.timestamp()) { - return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: None, min: Some(parent.timestamp() + 1), found: header.timestamp() }))) + let min = SystemTime::now() + Duration::from_secs(parent.timestamp() + 1); + let found = SystemTime::now() + Duration::from_secs(header.timestamp()); + return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: None, min: Some(min), found }))) } if header.number() != parent.number() + 1 { return Err(From::from(BlockError::InvalidNumber(Mismatch { expected: parent.number() + 1, found: header.number() }))); diff --git a/local-store/src/lib.rs b/local-store/src/lib.rs index 9120b8694a1..2ebc6a69ce2 100644 --- a/local-store/src/lib.rs +++ b/local-store/src/lib.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::fmt; +use std::time::Duration; use transaction::{ SignedTransaction, PendingTransaction, UnverifiedTransaction, @@ -50,7 +51,7 @@ extern crate kvdb_memorydb; const LOCAL_TRANSACTIONS_KEY: &'static [u8] = &*b"LOCAL_TXS"; const UPDATE_TIMER: ::io::TimerToken = 0; -const UPDATE_TIMEOUT_MS: u64 = 15 * 60 * 1000; // once every 15 minutes. +const UPDATE_TIMEOUT: Duration = Duration::from_secs(15 * 60); // once every 15 minutes. /// Errors which can occur while using the local data store. #[derive(Debug)] @@ -205,7 +206,7 @@ impl LocalDataStore { impl IoHandler for LocalDataStore { fn initialize(&self, io: &::io::IoContext) { - if let Err(e) = io.register_timer(UPDATE_TIMER, UPDATE_TIMEOUT_MS) { + if let Err(e) = io.register_timer(UPDATE_TIMER, UPDATE_TIMEOUT) { warn!(target: "local_store", "Error registering local store update timer: {}", e); } } diff --git a/parity/informant.rs b/parity/informant.rs index b834b018d6c..55a9df3d79e 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -399,7 +399,7 @@ const INFO_TIMER: TimerToken = 0; impl IoHandler for Informant { fn initialize(&self, io: &IoContext) { - io.register_timer(INFO_TIMER, 5000).expect("Error registering timer"); + io.register_timer(INFO_TIMER, Duration::from_secs(5)).expect("Error registering timer"); } fn timeout(&self, _io: &IoContext, timer: TimerToken) { diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs index 9acf2bf1f9c..94a8affa5ba 100644 --- a/parity/light_helpers/queue_cull.rs +++ b/parity/light_helpers/queue_cull.rs @@ -35,10 +35,10 @@ use parking_lot::RwLock; // Attepmt to cull once every 10 minutes. const TOKEN: TimerToken = 1; -const TIMEOUT_MS: u64 = 1000 * 60 * 10; +const TIMEOUT: Duration = Duration::from_secs(60 * 10); // But make each attempt last only 9 minutes -const PURGE_TIMEOUT: Duration = Duration::from_millis(1000 * 60 * 9); +const PURGE_TIMEOUT: Duration = Duration::from_secs(60 * 9); /// Periodically culls the transaction queue of mined transactions. pub struct QueueCull { @@ -56,7 +56,7 @@ pub struct QueueCull { impl IoHandler for QueueCull { fn initialize(&self, io: &IoContext) { - io.register_timer(TOKEN, TIMEOUT_MS).expect("Error registering timer"); + io.register_timer(TOKEN, TIMEOUT).expect("Error registering timer"); } fn timeout(&self, _io: &IoContext, timer: TimerToken) { diff --git a/rpc/src/v1/impls/personal.rs b/rpc/src/v1/impls/personal.rs index c3560fe9df3..03495fd37f8 100644 --- a/rpc/src/v1/impls/personal.rs +++ b/rpc/src/v1/impls/personal.rs @@ -16,6 +16,7 @@ //! Account management (personal) rpc implementation use std::sync::Arc; +use std::time::Duration; use bytes::{Bytes, ToPretty}; use ethcore::account_provider::AccountProvider; @@ -130,8 +131,8 @@ impl Personal for PersonalClient { Some("Restart your client with --geth flag or use personal_sendTransaction instead."), )), (true, Some(0)) => store.unlock_account_permanently(account, account_pass), - (true, Some(d)) => store.unlock_account_timed(account, account_pass, d * 1000), - (true, None) => store.unlock_account_timed(account, account_pass, 300_000), + (true, Some(d)) => store.unlock_account_timed(account, account_pass, Duration::from_secs(d.into())), + (true, None) => store.unlock_account_timed(account, account_pass, Duration::from_secs(300)), }; match r { Ok(_) => Ok(true), diff --git a/sync/src/api.rs b/sync/src/api.rs index 735b6eb033c..91502e04e94 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::collections::{HashMap, BTreeMap}; use std::io; +use std::time::Duration; use bytes::Bytes; use devp2p::{NetworkService, ConnectionFilter}; use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId, @@ -368,7 +369,7 @@ struct SyncProtocolHandler { impl NetworkProtocolHandler for SyncProtocolHandler { fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { - io.register_timer(0, 1000).expect("Error registering sync timer"); + io.register_timer(0, Duration::from_secs(1)).expect("Error registering sync timer"); } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 97802459129..9b45e3b1122 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -90,7 +90,7 @@ use std::collections::{HashSet, HashMap}; use std::cmp; -use std::time::Instant; +use std::time::{Duration, Instant}; use hash::keccak; use heapsize::HeapSizeOf; use ethereum_types::{H256, U256}; @@ -165,14 +165,14 @@ pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x16; const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3; -const WAIT_PEERS_TIMEOUT_SEC: u64 = 5; -const STATUS_TIMEOUT_SEC: u64 = 5; -const HEADERS_TIMEOUT_SEC: u64 = 15; -const BODIES_TIMEOUT_SEC: u64 = 20; -const RECEIPTS_TIMEOUT_SEC: u64 = 10; -const FORK_HEADER_TIMEOUT_SEC: u64 = 3; -const SNAPSHOT_MANIFEST_TIMEOUT_SEC: u64 = 5; -const SNAPSHOT_DATA_TIMEOUT_SEC: u64 = 120; +const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5); +const STATUS_TIMEOUT: Duration = Duration::from_secs(5); +const HEADERS_TIMEOUT: Duration = Duration::from_secs(15); +const BODIES_TIMEOUT: Duration = Duration::from_secs(20); +const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10); +const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3); +const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5); +const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120); #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Sync state @@ -558,7 +558,7 @@ impl ChainSync { (best_hash, max_peers, snapshot_peers) }; - let timeout = (self.state == SyncState::WaitingPeers) && self.sync_start_time.map_or(false, |t| t.elapsed().as_secs() > WAIT_PEERS_TIMEOUT_SEC); + let timeout = (self.state == SyncState::WaitingPeers) && self.sync_start_time.map_or(false, |t| t.elapsed() > WAIT_PEERS_TIMEOUT); if let (Some(hash), Some(peers)) = (best_hash, best_hash.map_or(None, |h| snapshot_peers.get(&h))) { if max_peers >= SNAPSHOT_MIN_PEERS { @@ -1806,15 +1806,15 @@ impl ChainSync { let tick = Instant::now(); let mut aborting = Vec::new(); for (peer_id, peer) in &self.peers { - let elapsed = (tick - peer.ask_time).as_secs(); + let elapsed = tick - peer.ask_time; let timeout = match peer.asking { - PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT_SEC, - PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT_SEC, - PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT_SEC, + PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT, + PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT, + PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT, PeerAsking::Nothing => false, - PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT_SEC, - PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT_SEC, - PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT_SEC, + PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT, + PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT, + PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT, }; if timeout { trace!(target:"sync", "Timeout {}", peer_id); @@ -1829,7 +1829,7 @@ impl ChainSync { // Check for handshake timeouts for (peer, &ask_time) in &self.handshaking_peers { let elapsed = (tick - ask_time) / 1_000_000_000; - if elapsed.as_secs() > STATUS_TIMEOUT_SEC { + if elapsed > STATUS_TIMEOUT { trace!(target:"sync", "Status timeout {}", peer); io.disconnect_peer(*peer); } diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index ef1bd8742f3..b16db2dff1d 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -58,12 +58,12 @@ mod sync_round; #[cfg(test)] mod tests; -// Base number of milliseconds for the header request timeout. -const REQ_TIMEOUT_MILLISECS_BASE: u64 = 7000; -// Additional number of milliseconds for each requested header. +// Base value for the header request timeout. +const REQ_TIMEOUT_BASE: Duration = Duration::from_secs(7); +// Additional value for each requested header. // If we request N headers, then the timeout will be: -// REQ_TIMEOUT_MILLISECS_BASE + N * REQ_TIMEOUT_MILLISECS_PER_HEADER -const REQ_TIMEOUT_MILLISECS_PER_HEADER: u64 = 10; +// REQ_TIMEOUT_BASE + N * REQ_TIMEOUT_PER_HEADER +const REQ_TIMEOUT_PER_HEADER: Duration = Duration::from_millis(10); /// Peer chain info. #[derive(Debug, Clone, PartialEq, Eq)] @@ -574,11 +574,12 @@ impl LightSync { if requested_from.contains(peer) { continue } match ctx.request_from(*peer, request.clone()) { Ok(id) => { - let timeout_ms = REQ_TIMEOUT_MILLISECS_BASE + - req.max * REQ_TIMEOUT_MILLISECS_PER_HEADER; + assert!(req.max <= u32::max_value() as u64, + "requesting more than 2^32 headers at a time would overflow"); + let timeout = REQ_TIMEOUT_BASE + REQ_TIMEOUT_PER_HEADER * req.max as u32; self.pending_reqs.lock().insert(id.clone(), PendingReq { started: Instant::now(), - timeout: Duration::from_millis(timeout_ms), + timeout, }); requested_from.insert(peer.clone()); diff --git a/util/io/src/service.rs b/util/io/src/service.rs index baa279cabdb..19f2d4b3bb5 100644 --- a/util/io/src/service.rs +++ b/util/io/src/service.rs @@ -54,7 +54,7 @@ pub enum IoMessage where Message: Send + Clone + Sized { AddTimer { handler_id: HandlerId, token: TimerToken, - delay: u64, + delay: Duration, once: bool, }, RemoveTimer { @@ -93,10 +93,10 @@ impl IoContext where Message: Send + Clone + Sync + 'static { } /// Register a new recurring IO timer. 'IoHandler::timeout' will be called with the token. - pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), IoError> { + pub fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), IoError> { self.channel.send_io(IoMessage::AddTimer { - token: token, - delay: ms, + token, + delay, handler_id: self.handler, once: false, })?; @@ -104,10 +104,10 @@ impl IoContext where Message: Send + Clone + Sync + 'static { } /// Register a new IO timer once. 'IoHandler::timeout' will be called with the token. - pub fn register_timer_once(&self, token: TimerToken, ms: u64) -> Result<(), IoError> { + pub fn register_timer_once(&self, token: TimerToken, delay: Duration) -> Result<(), IoError> { self.channel.send_io(IoMessage::AddTimer { - token: token, - delay: ms, + token, + delay, handler_id: self.handler, once: true, })?; @@ -173,7 +173,7 @@ impl IoContext where Message: Send + Clone + Sync + 'static { #[derive(Clone)] struct UserTimer { - delay: u64, + delay: Duration, timeout: Timeout, once: bool, } @@ -252,7 +252,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync self.timers.write().remove(&token_id); event_loop.clear_timeout(&timer.timeout); } else { - event_loop.timeout(token, Duration::from_millis(timer.delay)).expect("Error re-registering user timer"); + event_loop.timeout(token, timer.delay).expect("Error re-registering user timer"); } self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index }); self.work_ready.notify_all(); @@ -283,7 +283,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync }, IoMessage::AddTimer { handler_id, token, delay, once } => { let timer_id = token + handler_id * TOKENS_PER_HANDLER; - let timeout = event_loop.timeout(Token(timer_id), Duration::from_millis(delay)).expect("Error registering user timer"); + let timeout = event_loop.timeout(Token(timer_id), delay).expect("Error registering user timer"); self.timers.write().insert(timer_id, UserTimer { delay: delay, timeout: timeout, once: once }); }, IoMessage::RemoveTimer { handler_id, token } => { diff --git a/util/network-devp2p/src/connection.rs b/util/network-devp2p/src/connection.rs index 4d775b215f0..1db903feb5f 100644 --- a/util/network-devp2p/src/connection.rs +++ b/util/network-devp2p/src/connection.rs @@ -17,6 +17,7 @@ use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; +use std::time::Duration; use hash::{keccak, write_keccak}; use mio::{Token, Ready, PollOpt}; use mio::deprecated::{Handler, EventLoop, TryRead, TryWrite}; @@ -37,7 +38,7 @@ use crypto; use network::{Error, ErrorKind}; const ENCRYPTED_HEADER_LEN: usize = 32; -const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000; +const RECIEVE_PAYLOAD: Duration = Duration::from_secs(30); pub const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1; pub trait GenericSocket : Read + Write { @@ -447,7 +448,7 @@ impl EncryptedConnection { if let EncryptedConnectionState::Header = self.read_state { if let Some(data) = self.connection.readable()? { self.read_header(&data)?; - io.register_timer(self.connection.token, RECIEVE_PAYLOAD_TIMEOUT)?; + io.register_timer(self.connection.token, RECIEVE_PAYLOAD)?; } }; if let EncryptedConnectionState::Payload = self.read_state { diff --git a/util/network-devp2p/src/handshake.rs b/util/network-devp2p/src/handshake.rs index 020b65477ba..d7818b64d95 100644 --- a/util/network-devp2p/src/handshake.rs +++ b/util/network-devp2p/src/handshake.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use std::time::Duration; use rand::random; use hash::write_keccak; use mio::tcp::*; @@ -73,7 +74,7 @@ pub struct Handshake { const V4_AUTH_PACKET_SIZE: usize = 307; const V4_ACK_PACKET_SIZE: usize = 210; -const HANDSHAKE_TIMEOUT: u64 = 5000; +const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5); const PROTOCOL_VERSION: u64 = 4; // Amount of bytes added when encrypting with encryptECIES. const ECIES_OVERHEAD: usize = 113; diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 7056c67f6cb..73ca2aca4bd 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -24,6 +24,7 @@ use std::cmp::{min, max}; use std::path::{Path, PathBuf}; use std::io::{Read, Write, self}; use std::fs; +use std::time::Duration; use ethkey::{KeyPair, Secret, Random, Generator}; use hash::keccak; use mio::*; @@ -67,13 +68,13 @@ const SYS_TIMER: TimerToken = LAST_SESSION + 1; // Timeouts // for IDLE TimerToken -const MAINTENANCE_TIMEOUT: u64 = 1000; +const MAINTENANCE_TIMEOUT: Duration = Duration::from_secs(1); // for DISCOVERY_REFRESH TimerToken -const DISCOVERY_REFRESH_TIMEOUT: u64 = 60_000; +const DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(60); // for DISCOVERY_ROUND TimerToken -const DISCOVERY_ROUND_TIMEOUT: u64 = 300; +const DISCOVERY_ROUND_TIMEOUT: Duration = Duration::from_millis(300); // for NODE_TABLE TimerToken -const NODE_TABLE_TIMEOUT: u64 = 300_000; +const NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300); #[derive(Debug, PartialEq, Eq)] /// Protocol info @@ -165,10 +166,10 @@ impl<'s> NetworkContextTrait for NetworkContext<'s> { self.session.as_ref().map_or(false, |s| s.lock().expired()) } - fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error> { + fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error> { self.io.message(NetworkIoMessage::AddTimer { - token: token, - delay: ms, + token, + delay, protocol: self.protocol, }).unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e)); Ok(()) diff --git a/util/network-devp2p/src/session.rs b/util/network-devp2p/src/session.rs index 6353a90946f..c1e09a25252 100644 --- a/util/network-devp2p/src/session.rs +++ b/util/network-devp2p/src/session.rs @@ -34,8 +34,8 @@ use node_table::NodeId; use snappy; // Timeout must be less than (interval - 1). -const PING_TIMEOUT_SEC: Duration = Duration::from_secs(60); -const PING_INTERVAL_SEC: Duration = Duration::from_secs(120); +const PING_TIMEOUT: Duration = Duration::from_secs(60); +const PING_INTERVAL: Duration = Duration::from_secs(120); const MIN_PROTOCOL_VERSION: u32 = 4; const MIN_COMPRESSION_PROTOCOL_VERSION: u32 = 5; @@ -116,7 +116,7 @@ impl Session { protocol_version: 0, capabilities: Vec::new(), peer_capabilities: Vec::new(), - ping_ms: None, + ping: None, originated: originated, remote_address: "Handshake".to_owned(), local_address: local_addr, @@ -298,12 +298,12 @@ impl Session { return true; } let timed_out = if let Some(pong) = self.pong_time { - pong.duration_since(self.ping_time) > PING_TIMEOUT_SEC + pong.duration_since(self.ping_time) > PING_TIMEOUT } else { - self.ping_time.elapsed() > PING_TIMEOUT_SEC + self.ping_time.elapsed() > PING_TIMEOUT }; - if !timed_out && self.ping_time.elapsed() > PING_INTERVAL_SEC { + if !timed_out && self.ping_time.elapsed() > PING_INTERVAL { if let Err(e) = self.send_ping(io) { debug!("Error sending ping message: {:?}", e); } @@ -368,9 +368,7 @@ impl Session { PACKET_PONG => { let time = Instant::now(); self.pong_time = Some(time); - let ping_elapsed = time.duration_since(self.ping_time); - self.info.ping_ms = Some(ping_elapsed.as_secs() * 1_000 + - ping_elapsed.subsec_nanos() as u64 / 1_000_000); + self.info.ping = Some(time.duration_since(self.ping_time)); Ok(SessionData::Continue) }, PACKET_GET_PEERS => Ok(SessionData::None), //TODO; diff --git a/util/network/src/lib.rs b/util/network/src/lib.rs index 7bd13d1f392..c5365930bc9 100644 --- a/util/network/src/lib.rs +++ b/util/network/src/lib.rs @@ -37,6 +37,7 @@ use std::collections::HashMap; use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr}; use std::str::{self, FromStr}; use std::sync::Arc; +use std::time::Duration; use ipnetwork::{IpNetwork, IpNetworkError}; use io::IoChannel; use ethkey::Secret; @@ -74,8 +75,8 @@ pub enum NetworkIoMessage { protocol: ProtocolId, /// Timer token. token: TimerToken, - /// Timer delay in milliseconds. - delay: u64, + /// Timer delay. + delay: Duration, }, /// Initliaze public interface. InitPublicInterface, @@ -100,8 +101,8 @@ pub struct SessionInfo { pub capabilities: Vec, /// Peer protocol capabilities pub peer_capabilities: Vec, - /// Peer ping delay in milliseconds - pub ping_ms: Option, + /// Peer ping delay + pub ping: Option, /// True if this session was originated by us. pub originated: bool, /// Remote endpoint address of the session @@ -271,7 +272,7 @@ pub trait NetworkContext { fn is_expired(&self) -> bool; /// Register a new IO timer. 'IoHandler::timeout' will be called with the token. - fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error>; + fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error>; /// Returns peer identification string fn peer_client_version(&self, peer: PeerId) -> String; @@ -315,8 +316,8 @@ impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext { (**self).is_expired() } - fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error> { - (**self).register_timer(token, ms) + fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error> { + (**self).register_timer(token, delay) } fn peer_client_version(&self, peer: PeerId) -> String { diff --git a/util/unexpected/src/lib.rs b/util/unexpected/src/lib.rs index e34b2326cf3..4cf8448bd49 100644 --- a/util/unexpected/src/lib.rs +++ b/util/unexpected/src/lib.rs @@ -44,6 +44,18 @@ pub struct OutOfBounds { pub found: T, } +impl OutOfBounds { + pub fn map(self, map: F) -> OutOfBounds + where F: Fn(T) -> U + { + OutOfBounds { + min: self.min.map(&map), + max: self.max.map(&map), + found: map(self.found), + } + } +} + impl fmt::Display for OutOfBounds { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let msg = match (self.min.as_ref(), self.max.as_ref()) { diff --git a/whisper/src/net/mod.rs b/whisper/src/net/mod.rs index 28f3fee55a2..dd5e345bcde 100644 --- a/whisper/src/net/mod.rs +++ b/whisper/src/net/mod.rs @@ -36,7 +36,7 @@ mod tests; // how often periodic relays are. when messages are imported // we directly broadcast. const RALLY_TOKEN: TimerToken = 1; -const RALLY_TIMEOUT_MS: u64 = 2500; +const RALLY_TIMEOUT: Duration = Duration::from_millis(2500); /// Current protocol version. pub const PROTOCOL_VERSION: usize = 6; @@ -685,7 +685,7 @@ impl Network { impl ::network::NetworkProtocolHandler for Network { fn initialize(&self, io: &NetworkContext, host_info: &HostInfo) { // set up broadcast timer (< 1s) - io.register_timer(RALLY_TOKEN, RALLY_TIMEOUT_MS) + io.register_timer(RALLY_TOKEN, RALLY_TIMEOUT) .expect("Failed to initialize message rally timer"); *self.node_key.write() = host_info.id().clone();