Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
More code refactoring to integrate Duration
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka committed Apr 6, 2018
1 parent 27c32d3 commit 49cb890
Show file tree
Hide file tree
Showing 25 changed files with 168 additions and 132 deletions.
4 changes: 2 additions & 2 deletions dapps/src/tests/helpers/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl FetchControl {
}

pub fn wait_for_requests(&self, len: usize) {
const MAX_TIMEOUT_MS: u64 = 5000;
const MAX_TIMEOUT: time::Duration = time::Duration::from_millis(5000);
const ATTEMPTS: u64 = 10;
let mut attempts_left = ATTEMPTS;
loop {
Expand All @@ -50,7 +50,7 @@ impl FetchControl {
} else {
attempts_left -= 1;
// Should we handle spurious timeouts better?
thread::park_timeout(time::Duration::from_millis(MAX_TIMEOUT_MS / ATTEMPTS));
thread::park_timeout(MAX_TIMEOUT / ATTEMPTS);
}
}
}
Expand Down
26 changes: 14 additions & 12 deletions ethcore/light/src/net/load_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ pub trait SampleStore: Send + Sync {
}

// get a hardcoded, arbitrarily determined (but intended overestimate)
// of the time in nanoseconds to serve a request of the given kind.
// of the time it takes to serve a request of the given kind.
//
// TODO: seed this with empirical data.
fn hardcoded_serve_time(kind: Kind) -> u64 {
match kind {
fn hardcoded_serve_time(kind: Kind) -> Duration {
Duration::new(0, match kind {
Kind::Headers => 500_000,
Kind::HeaderProof => 500_000,
Kind::TransactionIndex => 500_000,
Expand All @@ -63,7 +63,7 @@ fn hardcoded_serve_time(kind: Kind) -> u64 {
Kind::Code => 1_500_000,
Kind::Execution => 250, // per gas.
Kind::Signal => 500_000,
}
})
}

/// A no-op store.
Expand Down Expand Up @@ -114,10 +114,10 @@ impl LoadDistribution {
}
}

/// Calculate EMA of load in nanoseconds for a specific request kind.
/// Calculate EMA of load for a specific request kind.
/// If there is no data for the given request kind, no EMA will be calculated,
/// but a hardcoded time will be returned.
pub fn expected_time_ns(&self, kind: Kind) -> u64 {
pub fn expected_time(&self, kind: Kind) -> Duration {
let samples = self.samples.read();
samples.get(&kind).and_then(|s| {
if s.len() == 0 { return None }
Expand All @@ -128,7 +128,9 @@ impl LoadDistribution {
(alpha * c as f64) + ((1.0 - alpha) * a)
});

Some(ema as u64)
// TODO: use `Duration::from_nanos` once stable (https://github.com/rust-lang/rust/issues/46507)
let ema = ema as u64;
Some(Duration::new(ema / 1_000_000_000, (ema % 1_000_000_000) as u32))
}).unwrap_or_else(move || hardcoded_serve_time(kind))
}

Expand Down Expand Up @@ -223,12 +225,12 @@ mod tests {
#[test]
fn hardcoded_before_data() {
let dist = LoadDistribution::load(&NullStore);
assert_eq!(dist.expected_time_ns(Kind::Headers), hardcoded_serve_time(Kind::Headers));
assert_eq!(dist.expected_time(Kind::Headers), hardcoded_serve_time(Kind::Headers));

dist.update(Kind::Headers, Duration::new(0, 100_000), 100);
dist.end_period(&NullStore);

assert_eq!(dist.expected_time_ns(Kind::Headers), 1000);
assert_eq!(dist.expected_time(Kind::Headers), Duration::new(0, 1000));
}

#[test]
Expand All @@ -244,7 +246,7 @@ mod tests {
sum += x;
if i == 0 { continue }

let moving_average = dist.expected_time_ns(Kind::Headers);
let moving_average = dist.expected_time(Kind::Headers);

// should be weighted below the maximum entry.
let arith_average = (sum as f64 / (i + 1) as f64) as u64;
Expand All @@ -255,9 +257,9 @@ mod tests {
// otherwise, the weight should be below the arithmetic mean because the much
// smaller previous values are discounted less.
if i == 1 {
assert_eq!(moving_average, arith_average);
assert_eq!(moving_average, Duration::new(0, arith_average));
} else {
assert!(moving_average < arith_average)
assert!(moving_average < Duration::new(0, arith_average))
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions ethcore/light/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ pub use self::load_timer::{SampleStore, FileStore};
pub use self::status::{Status, Capabilities, Announcement};

const TIMEOUT: TimerToken = 0;
const TIMEOUT_INTERVAL_MS: u64 = 1000;
const TIMEOUT_INTERVAL: Duration = Duration::from_secs(1);

const TICK_TIMEOUT: TimerToken = 1;
const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000;
const TICK_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);

const PROPAGATE_TIMEOUT: TimerToken = 2;
const PROPAGATE_TIMEOUT_INTERVAL_MS: u64 = 5000;
const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);

const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
const RECALCULATE_COSTS_INTERVAL_MS: u64 = 60 * 60 * 1000;
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);

// minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
Expand Down Expand Up @@ -369,9 +369,9 @@ impl LightProtocol {
let sample_store = params.sample_store.unwrap_or_else(|| Box::new(NullStore));
let load_distribution = LoadDistribution::load(&*sample_store);
let flow_params = FlowParams::from_request_times(
|kind| load_distribution.expected_time_ns(kind),
|kind| load_distribution.expected_time(kind),
params.config.load_share,
params.config.max_stored_seconds,
Duration::from_secs(params.config.max_stored_seconds),
);

LightProtocol {
Expand Down Expand Up @@ -766,9 +766,9 @@ impl LightProtocol {
self.load_distribution.end_period(&*self.sample_store);

let new_params = Arc::new(FlowParams::from_request_times(
|kind| self.load_distribution.expected_time_ns(kind),
|kind| self.load_distribution.expected_time(kind),
self.config.load_share,
self.config.max_stored_seconds,
Duration::from_secs(self.config.max_stored_seconds),
));
*self.flow_params.write() = new_params.clone();

Expand Down Expand Up @@ -1080,13 +1080,13 @@ fn punish(peer: PeerId, io: &IoContext, e: Error) {

impl NetworkProtocolHandler for LightProtocol {
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS)
io.register_timer(TIMEOUT, TIMEOUT_INTERVAL)
.expect("Error registering sync timer.");
io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS)
io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL)
.expect("Error registering sync timer.");
io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL_MS)
io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL)
.expect("Error registering sync timer.");
io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL_MS)
io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL)
.expect("Error registering request timer interval token.");
}

Expand Down
21 changes: 14 additions & 7 deletions ethcore/light/src/net/request_credits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,23 +235,30 @@ impl FlowParams {

/// Create new flow parameters from ,
/// proportion of total capacity which should be given to a peer,
/// and number of seconds of stored capacity a peer can accumulate.
pub fn from_request_times<F: Fn(::request::Kind) -> u64>(
request_time_ns: F,
/// and stored capacity a peer can accumulate.
pub fn from_request_times<F: Fn(::request::Kind) -> Duration>(
request_time: F,
load_share: f64,
max_stored_seconds: u64
max_stored: Duration
) -> Self {
use request::Kind;

let load_share = load_share.abs();

let recharge: u64 = 100_000_000;
let max = recharge.saturating_mul(max_stored_seconds);
let max = {
let sec = max_stored.as_secs().saturating_mul(recharge);
let nanos = (max_stored.subsec_nanos() as u64).saturating_mul(recharge) / 1_000_000_000;
sec + nanos
};

let cost_for_kind = |kind| {
// how many requests we can handle per second
let ns = request_time_ns(kind);
let second_duration = 1_000_000_000f64 / ns as f64;
let rq_dur = request_time(kind);
let second_duration = {
let as_ns = rq_dur.as_secs() as f64 * 1_000_000_000f64 + rq_dur.subsec_nanos() as f64;
1_000_000_000f64 / as_ns
};

// scale by share of the load given to this peer.
let serve_per_second = second_duration * load_share;
Expand Down
9 changes: 5 additions & 4 deletions ethcore/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::sync::Arc;
use std::path::Path;
use std::time::Duration;

use ansi_term::Colour;
use io::{IoContext, TimerToken, IoHandler, IoService, IoError};
Expand Down Expand Up @@ -140,13 +141,13 @@ struct ClientIoHandler {
const CLIENT_TICK_TIMER: TimerToken = 0;
const SNAPSHOT_TICK_TIMER: TimerToken = 1;

const CLIENT_TICK_MS: u64 = 5000;
const SNAPSHOT_TICK_MS: u64 = 10000;
const CLIENT_TICK: Duration = Duration::from_secs(5);
const SNAPSHOT_TICK: Duration = Duration::from_secs(10);

impl IoHandler<ClientIoMessage> for ClientIoHandler {
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer");
io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK_MS).expect("Error registering snapshot timer");
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK).expect("Error registering client timer");
io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK).expect("Error registering snapshot timer");
}

fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
Expand Down
10 changes: 5 additions & 5 deletions ethcore/src/account_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,8 @@ impl AccountProvider {
}

/// Unlocks account temporarily with a timeout.
pub fn unlock_account_timed(&self, account: Address, password: String, duration_ms: u32) -> Result<(), Error> {
self.unlock_account(account, password, Unlock::Timed(Instant::now() + Duration::from_millis(duration_ms as u64)))
pub fn unlock_account_timed(&self, account: Address, password: String, duration: Duration) -> Result<(), Error> {
self.unlock_account(account, password, Unlock::Timed(Instant::now() + duration))
}

/// Checks if given account is unlocked
Expand Down Expand Up @@ -837,7 +837,7 @@ impl AccountProvider {
#[cfg(test)]
mod tests {
use super::{AccountProvider, Unlock, DappId};
use std::time::Instant;
use std::time::{Duration, Instant};
use ethstore::ethkey::{Generator, Random, Address};
use ethstore::{StoreAccountRef, Derivation};
use ethereum_types::H256;
Expand Down Expand Up @@ -941,8 +941,8 @@ mod tests {
let kp = Random.generate().unwrap();
let ap = AccountProvider::transient_provider();
assert!(ap.insert_account(kp.secret().clone(), "test").is_ok());
assert!(ap.unlock_account_timed(kp.address(), "test1".into(), 60000).is_err());
assert!(ap.unlock_account_timed(kp.address(), "test".into(), 60000).is_ok());
assert!(ap.unlock_account_timed(kp.address(), "test1".into(), Duration::from_secs(60)).is_err());
assert!(ap.unlock_account_timed(kp.address(), "test".into(), Duration::from_secs(60)).is_ok());
assert!(ap.sign(kp.address(), None, Default::default()).is_ok());
ap.unlocked.write().get_mut(&StoreAccountRef::root(kp.address())).unwrap().unlock = Unlock::Timed(Instant::now());
assert!(ap.sign(kp.address(), None, Default::default()).is_err());
Expand Down
9 changes: 5 additions & 4 deletions ethcore/src/engines/authority_round/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::fmt;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Weak, Arc};
use std::time::{UNIX_EPOCH, Duration};
use std::time::{UNIX_EPOCH, SystemTime, Duration};
use std::collections::{BTreeMap, HashSet};
use std::iter::FromIterator;

Expand Down Expand Up @@ -536,6 +536,7 @@ fn verify_timestamp(step: &Step, header_step: usize) -> Result<(), BlockError> {
// NOTE This error might be returned only in early stage of verification (Stage 1).
// Returning it further won't recover the sync process.
trace!(target: "engine", "verify_timestamp: block too early");
let oob = oob.map(|n| SystemTime::now() + Duration::from_secs(n));
Err(BlockError::TemporarilyInvalid(oob).into())
},
Ok(_) => Ok(()),
Expand Down Expand Up @@ -694,8 +695,8 @@ const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;
impl IoHandler<()> for TransitionHandler {
fn initialize(&self, io: &IoContext<()>) {
if let Some(engine) = self.engine.upgrade() {
let remaining = engine.step.duration_remaining();
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, remaining.as_millis())
let remaining = engine.step.duration_remaining().as_millis();
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e))
}
}
Expand All @@ -711,7 +712,7 @@ impl IoHandler<()> for TransitionHandler {
}

let next_run_at = engine.step.duration_remaining().as_millis() >> 2;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, next_run_at)
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
}
}
Expand Down
3 changes: 1 addition & 2 deletions ethcore/src/engines/transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ impl<S, M: Machine> TransitionHandler<S, M> where S: Sync + Send + Clone {
pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;

fn set_timeout<S: Sync + Send + Clone>(io: &IoContext<S>, timeout: Duration) {
let ms = timeout.as_secs() * 1_000 + timeout.subsec_nanos() as u64 / 1_000_000;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, ms)
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, timeout)
.unwrap_or_else(|e| warn!(target: "engine", "Failed to set consensus step timeout: {}.", e))
}

Expand Down
15 changes: 11 additions & 4 deletions ethcore/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! General error types for use in ethcore.
use std::fmt;
use std::time::SystemTime;
use kvdb;
use ethereum_types::{H256, U256, Address, Bloom};
use util_error::UtilError;
Expand Down Expand Up @@ -81,9 +82,9 @@ pub enum BlockError {
/// Receipts trie root header field is invalid.
InvalidReceiptsRoot(Mismatch<H256>),
/// Timestamp header field is invalid.
InvalidTimestamp(OutOfBounds<u64>),
InvalidTimestamp(OutOfBounds<SystemTime>),
/// Timestamp header field is too far in future.
TemporarilyInvalid(OutOfBounds<u64>),
TemporarilyInvalid(OutOfBounds<SystemTime>),
/// Log bloom header field is invalid.
InvalidLogBloom(Mismatch<Bloom>),
/// Number field of header is invalid.
Expand Down Expand Up @@ -125,8 +126,14 @@ impl fmt::Display for BlockError {
InvalidSeal => "Block has invalid seal.".into(),
InvalidGasLimit(ref oob) => format!("Invalid gas limit: {}", oob),
InvalidReceiptsRoot(ref mis) => format!("Invalid receipts trie root in header: {}", mis),
InvalidTimestamp(ref oob) => format!("Invalid timestamp in header: {}", oob),
TemporarilyInvalid(ref oob) => format!("Future timestamp in header: {}", oob),
InvalidTimestamp(ref oob) => {
let oob = oob.map(|st| st.elapsed().unwrap_or_default().as_secs());
format!("Invalid timestamp in header: {}", oob)
},
TemporarilyInvalid(ref oob) => {
let oob = oob.map(|st| st.elapsed().unwrap_or_default().as_secs());
format!("Future timestamp in header: {}", oob)
},
InvalidLogBloom(ref oob) => format!("Invalid log bloom in header: {}", oob),
InvalidNumber(ref mis) => format!("Invalid number in header: {}", mis),
RidiculousNumber(ref oob) => format!("Implausible block number. {}", oob),
Expand Down
15 changes: 8 additions & 7 deletions ethcore/src/verification/verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! 3. Final verification against the blockchain done before enactment.
use std::collections::HashSet;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use bytes::Bytes;
use ethereum_types::H256;
Expand Down Expand Up @@ -284,11 +284,10 @@ pub fn verify_header_params(header: &Header, engine: &EthEngine, is_full: bool)
}

if is_full {
const ACCEPTABLE_DRIFT_SECS: u64 = 15;
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default();
let max_time = now.as_secs() + ACCEPTABLE_DRIFT_SECS;
let invalid_threshold = max_time + ACCEPTABLE_DRIFT_SECS * 9;
let timestamp = header.timestamp();
const ACCEPTABLE_DRIFT: Duration = Duration::from_secs(15);
let max_time = SystemTime::now() + ACCEPTABLE_DRIFT;
let invalid_threshold = max_time + ACCEPTABLE_DRIFT * 9;
let timestamp = UNIX_EPOCH + Duration::from_secs(header.timestamp());

if timestamp > invalid_threshold {
return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: Some(max_time), min: None, found: timestamp })))
Expand All @@ -310,7 +309,9 @@ fn verify_parent(header: &Header, parent: &Header, engine: &EthEngine) -> Result
let gas_limit_divisor = engine.params().gas_limit_bound_divisor;

if !engine.is_timestamp_valid(header.timestamp(), parent.timestamp()) {
return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: None, min: Some(parent.timestamp() + 1), found: header.timestamp() })))
let min = SystemTime::now() + Duration::from_secs(parent.timestamp() + 1);
let found = SystemTime::now() + Duration::from_secs(header.timestamp());
return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: None, min: Some(min), found })))
}
if header.number() != parent.number() + 1 {
return Err(From::from(BlockError::InvalidNumber(Mismatch { expected: parent.number() + 1, found: header.number() })));
Expand Down
Loading

0 comments on commit 49cb890

Please sign in to comment.