Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

More code refactoring to integrate Duration #8322

Merged
merged 6 commits into from
Apr 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dapps/src/tests/helpers/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl FetchControl {
}

pub fn wait_for_requests(&self, len: usize) {
const MAX_TIMEOUT_MS: u64 = 5000;
const ATTEMPTS: u64 = 10;
const MAX_TIMEOUT: time::Duration = time::Duration::from_millis(5000);
const ATTEMPTS: u32 = 10;
let mut attempts_left = ATTEMPTS;
loop {
let current = self.fetch.requested.lock().len();
Expand All @@ -50,7 +50,7 @@ impl FetchControl {
} else {
attempts_left -= 1;
// Should we handle spurious timeouts better?
thread::park_timeout(time::Duration::from_millis(MAX_TIMEOUT_MS / ATTEMPTS));
thread::park_timeout(MAX_TIMEOUT / ATTEMPTS);
}
}
}
Expand Down
30 changes: 16 additions & 14 deletions ethcore/light/src/net/load_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ pub trait SampleStore: Send + Sync {
}

// get a hardcoded, arbitrarily determined (but intended overestimate)
// of the time in nanoseconds to serve a request of the given kind.
// of the time it takes to serve a request of the given kind.
//
// TODO: seed this with empirical data.
fn hardcoded_serve_time(kind: Kind) -> u64 {
match kind {
fn hardcoded_serve_time(kind: Kind) -> Duration {
Duration::new(0, match kind {
Kind::Headers => 500_000,
Kind::HeaderProof => 500_000,
Kind::TransactionIndex => 500_000,
Expand All @@ -63,7 +63,7 @@ fn hardcoded_serve_time(kind: Kind) -> u64 {
Kind::Code => 1_500_000,
Kind::Execution => 250, // per gas.
Kind::Signal => 500_000,
}
})
}

/// A no-op store.
Expand Down Expand Up @@ -114,10 +114,10 @@ impl LoadDistribution {
}
}

/// Calculate EMA of load in nanoseconds for a specific request kind.
/// Calculate EMA of load for a specific request kind.
/// If there is no data for the given request kind, no EMA will be calculated,
/// but a hardcoded time will be returned.
pub fn expected_time_ns(&self, kind: Kind) -> u64 {
pub fn expected_time(&self, kind: Kind) -> Duration {
let samples = self.samples.read();
samples.get(&kind).and_then(|s| {
if s.len() == 0 { return None }
Expand All @@ -128,7 +128,9 @@ impl LoadDistribution {
(alpha * c as f64) + ((1.0 - alpha) * a)
});

Some(ema as u64)
// TODO: use `Duration::from_nanos` once stable (https://github.com/rust-lang/rust/issues/46507)
let ema = ema as u64;
Some(Duration::new(ema / 1_000_000_000, (ema % 1_000_000_000) as u32))
}).unwrap_or_else(move || hardcoded_serve_time(kind))
}

Expand Down Expand Up @@ -223,12 +225,12 @@ mod tests {
#[test]
fn hardcoded_before_data() {
let dist = LoadDistribution::load(&NullStore);
assert_eq!(dist.expected_time_ns(Kind::Headers), hardcoded_serve_time(Kind::Headers));
assert_eq!(dist.expected_time(Kind::Headers), hardcoded_serve_time(Kind::Headers));

dist.update(Kind::Headers, Duration::new(0, 100_000), 100);
dist.end_period(&NullStore);

assert_eq!(dist.expected_time_ns(Kind::Headers), 1000);
assert_eq!(dist.expected_time(Kind::Headers), Duration::new(0, 1000));
}

#[test]
Expand All @@ -244,20 +246,20 @@ mod tests {
sum += x;
if i == 0 { continue }

let moving_average = dist.expected_time_ns(Kind::Headers);
let moving_average = dist.expected_time(Kind::Headers);

// should be weighted below the maximum entry.
let arith_average = (sum as f64 / (i + 1) as f64) as u64;
assert!(moving_average < x as u64);
let arith_average = (sum as f64 / (i + 1) as f64) as u32;
assert!(moving_average < Duration::new(0, x));

// when there are only 2 entries, they should be equal due to choice of
// ALPHA = 1/N.
// otherwise, the weight should be below the arithmetic mean because the much
// smaller previous values are discounted less.
if i == 1 {
assert_eq!(moving_average, arith_average);
assert_eq!(moving_average, Duration::new(0, arith_average));
} else {
assert!(moving_average < arith_average)
assert!(moving_average < Duration::new(0, arith_average))
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions ethcore/light/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ pub use self::load_timer::{SampleStore, FileStore};
pub use self::status::{Status, Capabilities, Announcement};

const TIMEOUT: TimerToken = 0;
const TIMEOUT_INTERVAL_MS: u64 = 1000;
const TIMEOUT_INTERVAL: Duration = Duration::from_secs(1);

const TICK_TIMEOUT: TimerToken = 1;
const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000;
const TICK_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);

const PROPAGATE_TIMEOUT: TimerToken = 2;
const PROPAGATE_TIMEOUT_INTERVAL_MS: u64 = 5000;
const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);

const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
const RECALCULATE_COSTS_INTERVAL_MS: u64 = 60 * 60 * 1000;
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);

// minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
Expand Down Expand Up @@ -369,9 +369,9 @@ impl LightProtocol {
let sample_store = params.sample_store.unwrap_or_else(|| Box::new(NullStore));
let load_distribution = LoadDistribution::load(&*sample_store);
let flow_params = FlowParams::from_request_times(
|kind| load_distribution.expected_time_ns(kind),
|kind| load_distribution.expected_time(kind),
params.config.load_share,
params.config.max_stored_seconds,
Duration::from_secs(params.config.max_stored_seconds),
);

LightProtocol {
Expand Down Expand Up @@ -766,9 +766,9 @@ impl LightProtocol {
self.load_distribution.end_period(&*self.sample_store);

let new_params = Arc::new(FlowParams::from_request_times(
|kind| self.load_distribution.expected_time_ns(kind),
|kind| self.load_distribution.expected_time(kind),
self.config.load_share,
self.config.max_stored_seconds,
Duration::from_secs(self.config.max_stored_seconds),
));
*self.flow_params.write() = new_params.clone();

Expand Down Expand Up @@ -1080,13 +1080,13 @@ fn punish(peer: PeerId, io: &IoContext, e: Error) {

impl NetworkProtocolHandler for LightProtocol {
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS)
io.register_timer(TIMEOUT, TIMEOUT_INTERVAL)
.expect("Error registering sync timer.");
io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS)
io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL)
.expect("Error registering sync timer.");
io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL_MS)
io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL)
.expect("Error registering sync timer.");
io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL_MS)
io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL)
.expect("Error registering request timer interval token.");
}

Expand Down
33 changes: 20 additions & 13 deletions ethcore/light/src/net/request_credits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,23 +235,30 @@ impl FlowParams {

/// Create new flow parameters from ,
/// proportion of total capacity which should be given to a peer,
/// and number of seconds of stored capacity a peer can accumulate.
pub fn from_request_times<F: Fn(::request::Kind) -> u64>(
request_time_ns: F,
/// and stored capacity a peer can accumulate.
pub fn from_request_times<F: Fn(::request::Kind) -> Duration>(
request_time: F,
load_share: f64,
max_stored_seconds: u64
max_stored: Duration
) -> Self {
use request::Kind;

let load_share = load_share.abs();

let recharge: u64 = 100_000_000;
let max = recharge.saturating_mul(max_stored_seconds);
let max = {
let sec = max_stored.as_secs().saturating_mul(recharge);
let nanos = (max_stored.subsec_nanos() as u64).saturating_mul(recharge) / 1_000_000_000;
sec + nanos
};

let cost_for_kind = |kind| {
// how many requests we can handle per second
let ns = request_time_ns(kind);
let second_duration = 1_000_000_000f64 / ns as f64;
let rq_dur = request_time(kind);
let second_duration = {
let as_ns = rq_dur.as_secs() as f64 * 1_000_000_000f64 + rq_dur.subsec_nanos() as f64;
1_000_000_000f64 / as_ns
};

// scale by share of the load given to this peer.
let serve_per_second = second_duration * load_share;
Expand Down Expand Up @@ -426,21 +433,21 @@ mod tests {
#[test]
fn scale_by_load_share_and_time() {
let flow_params = FlowParams::from_request_times(
|_| 10_000,
|_| Duration::new(0, 10_000),
0.05,
60,
Duration::from_secs(60),
);

let flow_params2 = FlowParams::from_request_times(
|_| 10_000,
|_| Duration::new(0, 10_000),
0.1,
60,
Duration::from_secs(60),
);

let flow_params3 = FlowParams::from_request_times(
|_| 5_000,
|_| Duration::new(0, 5_000),
0.05,
60,
Duration::from_secs(60),
);

assert_eq!(flow_params2.costs, flow_params3.costs);
Expand Down
9 changes: 5 additions & 4 deletions ethcore/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use std::sync::Arc;
use std::path::Path;
use std::time::Duration;

use ansi_term::Colour;
use io::{IoContext, TimerToken, IoHandler, IoService, IoError};
Expand Down Expand Up @@ -173,13 +174,13 @@ struct ClientIoHandler {
const CLIENT_TICK_TIMER: TimerToken = 0;
const SNAPSHOT_TICK_TIMER: TimerToken = 1;

const CLIENT_TICK_MS: u64 = 5000;
const SNAPSHOT_TICK_MS: u64 = 10000;
const CLIENT_TICK: Duration = Duration::from_secs(5);
const SNAPSHOT_TICK: Duration = Duration::from_secs(10);

impl IoHandler<ClientIoMessage> for ClientIoHandler {
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer");
io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK_MS).expect("Error registering snapshot timer");
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK).expect("Error registering client timer");
io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK).expect("Error registering snapshot timer");
}

fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
Expand Down
10 changes: 5 additions & 5 deletions ethcore/src/account_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,8 @@ impl AccountProvider {
}

/// Unlocks account temporarily with a timeout.
pub fn unlock_account_timed(&self, account: Address, password: String, duration_ms: u32) -> Result<(), Error> {
self.unlock_account(account, password, Unlock::Timed(Instant::now() + Duration::from_millis(duration_ms as u64)))
pub fn unlock_account_timed(&self, account: Address, password: String, duration: Duration) -> Result<(), Error> {
self.unlock_account(account, password, Unlock::Timed(Instant::now() + duration))
}

/// Checks if given account is unlocked
Expand Down Expand Up @@ -837,7 +837,7 @@ impl AccountProvider {
#[cfg(test)]
mod tests {
use super::{AccountProvider, Unlock, DappId};
use std::time::Instant;
use std::time::{Duration, Instant};
use ethstore::ethkey::{Generator, Random, Address};
use ethstore::{StoreAccountRef, Derivation};
use ethereum_types::H256;
Expand Down Expand Up @@ -941,8 +941,8 @@ mod tests {
let kp = Random.generate().unwrap();
let ap = AccountProvider::transient_provider();
assert!(ap.insert_account(kp.secret().clone(), "test").is_ok());
assert!(ap.unlock_account_timed(kp.address(), "test1".into(), 60000).is_err());
assert!(ap.unlock_account_timed(kp.address(), "test".into(), 60000).is_ok());
assert!(ap.unlock_account_timed(kp.address(), "test1".into(), Duration::from_secs(60)).is_err());
assert!(ap.unlock_account_timed(kp.address(), "test".into(), Duration::from_secs(60)).is_ok());
assert!(ap.sign(kp.address(), None, Default::default()).is_ok());
ap.unlocked.write().get_mut(&StoreAccountRef::root(kp.address())).unwrap().unlock = Unlock::Timed(Instant::now());
assert!(ap.sign(kp.address(), None, Default::default()).is_err());
Expand Down
9 changes: 5 additions & 4 deletions ethcore/src/engines/authority_round/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::fmt;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Weak, Arc};
use std::time::{UNIX_EPOCH, Duration};
use std::time::{UNIX_EPOCH, SystemTime, Duration};
use std::collections::{BTreeMap, HashSet};
use std::iter::FromIterator;

Expand Down Expand Up @@ -536,6 +536,7 @@ fn verify_timestamp(step: &Step, header_step: usize) -> Result<(), BlockError> {
// NOTE This error might be returned only in early stage of verification (Stage 1).
// Returning it further won't recover the sync process.
trace!(target: "engine", "verify_timestamp: block too early");
let oob = oob.map(|n| SystemTime::now() + Duration::from_secs(n));
Err(BlockError::TemporarilyInvalid(oob).into())
},
Ok(_) => Ok(()),
Expand Down Expand Up @@ -694,8 +695,8 @@ const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;
impl IoHandler<()> for TransitionHandler {
fn initialize(&self, io: &IoContext<()>) {
if let Some(engine) = self.engine.upgrade() {
let remaining = engine.step.duration_remaining();
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, remaining.as_millis())
let remaining = engine.step.duration_remaining().as_millis();
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e))
}
}
Expand All @@ -711,7 +712,7 @@ impl IoHandler<()> for TransitionHandler {
}

let next_run_at = engine.step.duration_remaining().as_millis() >> 2;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, next_run_at)
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
}
}
Expand Down
3 changes: 1 addition & 2 deletions ethcore/src/engines/transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ impl<S, M: Machine> TransitionHandler<S, M> where S: Sync + Send + Clone {
pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;

fn set_timeout<S: Sync + Send + Clone>(io: &IoContext<S>, timeout: Duration) {
let ms = timeout.as_secs() * 1_000 + timeout.subsec_nanos() as u64 / 1_000_000;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, ms)
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, timeout)
.unwrap_or_else(|e| warn!(target: "engine", "Failed to set consensus step timeout: {}.", e))
}

Expand Down
15 changes: 11 additions & 4 deletions ethcore/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! General error types for use in ethcore.

use std::{fmt, error};
use std::time::SystemTime;
use kvdb;
use ethereum_types::{H256, U256, Address, Bloom};
use util_error::UtilError;
Expand Down Expand Up @@ -81,9 +82,9 @@ pub enum BlockError {
/// Receipts trie root header field is invalid.
InvalidReceiptsRoot(Mismatch<H256>),
/// Timestamp header field is invalid.
InvalidTimestamp(OutOfBounds<u64>),
InvalidTimestamp(OutOfBounds<SystemTime>),
/// Timestamp header field is too far in future.
TemporarilyInvalid(OutOfBounds<u64>),
TemporarilyInvalid(OutOfBounds<SystemTime>),
/// Log bloom header field is invalid.
InvalidLogBloom(Mismatch<Bloom>),
/// Number field of header is invalid.
Expand Down Expand Up @@ -125,8 +126,14 @@ impl fmt::Display for BlockError {
InvalidSeal => "Block has invalid seal.".into(),
InvalidGasLimit(ref oob) => format!("Invalid gas limit: {}", oob),
InvalidReceiptsRoot(ref mis) => format!("Invalid receipts trie root in header: {}", mis),
InvalidTimestamp(ref oob) => format!("Invalid timestamp in header: {}", oob),
TemporarilyInvalid(ref oob) => format!("Future timestamp in header: {}", oob),
InvalidTimestamp(ref oob) => {
let oob = oob.map(|st| st.elapsed().unwrap_or_default().as_secs());
format!("Invalid timestamp in header: {}", oob)
},
TemporarilyInvalid(ref oob) => {
let oob = oob.map(|st| st.elapsed().unwrap_or_default().as_secs());
format!("Future timestamp in header: {}", oob)
},
InvalidLogBloom(ref oob) => format!("Invalid log bloom in header: {}", oob),
InvalidNumber(ref mis) => format!("Invalid number in header: {}", mis),
RidiculousNumber(ref oob) => format!("Implausible block number. {}", oob),
Expand Down
Loading