diff --git a/rt/src/process.rs b/rt/src/process.rs index 445b3af79..0e77a8c7a 100644 --- a/rt/src/process.rs +++ b/rt/src/process.rs @@ -938,7 +938,7 @@ impl Channel { mod tests { use super::*; use crate::mem::tagged_int; - use crate::test::{empty_class, empty_process_class, OwnedProcess}; + use crate::test::{empty_class, empty_process_class, setup, OwnedProcess}; use std::time::Duration; macro_rules! offset_of { @@ -1077,62 +1077,65 @@ mod tests { #[test] fn test_process_state_has_same_timeout() { - let mut state = ProcessState::new(); - let timeout = Timeout::with_rc(Duration::from_secs(0)); + let state = setup(); + let mut proc_state = ProcessState::new(); + let timeout = Timeout::duration(&state, Duration::from_secs(0)); - assert!(!state.has_same_timeout(&timeout)); + assert!(!proc_state.has_same_timeout(&timeout)); - state.timeout = Some(timeout.clone()); + proc_state.timeout = Some(timeout.clone()); - assert!(state.has_same_timeout(&timeout)); + assert!(proc_state.has_same_timeout(&timeout)); } #[test] fn test_process_state_try_reschedule_after_timeout() { - let mut state = ProcessState::new(); + let state = setup(); + let mut proc_state = ProcessState::new(); assert_eq!( - state.try_reschedule_after_timeout(), + proc_state.try_reschedule_after_timeout(), RescheduleRights::Failed ); - state.waiting_for_channel(None); + proc_state.waiting_for_channel(None); assert_eq!( - state.try_reschedule_after_timeout(), + proc_state.try_reschedule_after_timeout(), RescheduleRights::Acquired ); - assert!(!state.status.is_waiting_for_channel()); - assert!(!state.status.is_waiting()); + assert!(!proc_state.status.is_waiting_for_channel()); + assert!(!proc_state.status.is_waiting()); - let timeout = Timeout::with_rc(Duration::from_secs(0)); + let timeout = Timeout::duration(&state, Duration::from_secs(0)); - state.waiting_for_channel(Some(timeout)); + proc_state.waiting_for_channel(Some(timeout)); assert_eq!( - state.try_reschedule_after_timeout(), + proc_state.try_reschedule_after_timeout(), RescheduleRights::AcquiredWithTimeout ); - assert!(!state.status.is_waiting_for_channel()); - assert!(!state.status.is_waiting()); + assert!(!proc_state.status.is_waiting_for_channel()); + assert!(!proc_state.status.is_waiting()); } #[test] fn test_process_state_waiting_for_channel() { - let mut state = ProcessState::new(); - let timeout = Timeout::with_rc(Duration::from_secs(0)); + let state = setup(); + let mut proc_state = ProcessState::new(); + let timeout = Timeout::duration(&state, Duration::from_secs(0)); - state.waiting_for_channel(None); + proc_state.waiting_for_channel(None); - assert!(state.status.is_waiting_for_channel()); - assert!(state.timeout.is_none()); + assert!(proc_state.status.is_waiting_for_channel()); + assert!(proc_state.timeout.is_none()); - state.waiting_for_channel(Some(timeout)); + proc_state.waiting_for_channel(Some(timeout)); - assert!(state.status.is_waiting_for_channel()); - assert!(state.timeout.is_some()); + assert!(proc_state.status.is_waiting_for_channel()); + assert!(proc_state.timeout.is_some()); } #[test] @@ -1155,28 +1158,30 @@ mod tests { #[test] fn test_process_state_try_reschedule_for_channel() { - let mut state = ProcessState::new(); + let state = setup(); + let mut proc_state = ProcessState::new(); assert_eq!( - state.try_reschedule_for_channel(), + proc_state.try_reschedule_for_channel(), RescheduleRights::Failed ); - state.status.set_waiting_for_channel(true); + proc_state.status.set_waiting_for_channel(true); assert_eq!( - state.try_reschedule_for_channel(), + proc_state.try_reschedule_for_channel(), RescheduleRights::Acquired ); - assert!(!state.status.is_waiting_for_channel()); + assert!(!proc_state.status.is_waiting_for_channel()); - state.status.set_waiting_for_channel(true); - state.timeout = Some(Timeout::with_rc(Duration::from_secs(0))); + proc_state.status.set_waiting_for_channel(true); + proc_state.timeout = + Some(Timeout::duration(&state, Duration::from_secs(0))); assert_eq!( - state.try_reschedule_for_channel(), + proc_state.try_reschedule_for_channel(), RescheduleRights::AcquiredWithTimeout ); - assert!(!state.status.is_waiting_for_channel()); + assert!(!proc_state.status.is_waiting_for_channel()); } #[test] @@ -1211,10 +1216,11 @@ mod tests { #[test] fn test_process_state_suspend() { + let state = setup(); let class = empty_process_class("A"); let stack = Stack::new(32); let process = OwnedProcess::new(Process::alloc(*class, stack)); - let timeout = Timeout::with_rc(Duration::from_secs(0)); + let timeout = Timeout::duration(&state, Duration::from_secs(0)); process.state().suspend(timeout); @@ -1224,10 +1230,11 @@ mod tests { #[test] fn test_process_timeout_expired() { + let state = setup(); let class = empty_process_class("A"); let stack = Stack::new(32); let process = OwnedProcess::new(Process::alloc(*class, stack)); - let timeout = Timeout::with_rc(Duration::from_secs(0)); + let timeout = Timeout::duration(&state, Duration::from_secs(0)); assert!(!process.timeout_expired()); @@ -1325,6 +1332,7 @@ mod tests { #[test] fn test_channel_send_with_waiting_with_timeout() { + let state = setup(); let process_class = empty_process_class("A"); let process = OwnedProcess::new(Process::alloc(*process_class, Stack::new(32))); @@ -1333,7 +1341,10 @@ mod tests { let chan = unsafe { &(*chan_ptr) }; let msg = tagged_int(42); - chan.receive(*process, Some(Timeout::with_rc(Duration::from_secs(0)))); + chan.receive( + *process, + Some(Timeout::duration(&state, Duration::from_secs(0))), + ); assert_eq!( chan.send(*process, msg as _), diff --git a/rt/src/runtime.rs b/rt/src/runtime.rs index f520714d9..2ed55114c 100644 --- a/rt/src/runtime.rs +++ b/rt/src/runtime.rs @@ -119,7 +119,7 @@ impl Runtime { .name("timeout".to_string()) .spawn(move || { pin_thread_to_core(0); - state.timeout_worker.run(&state.scheduler) + state.timeout_worker.run(&state) }) .unwrap(); diff --git a/rt/src/runtime/process.rs b/rt/src/runtime/process.rs index 1d3618386..f335c92c3 100644 --- a/rt/src/runtime/process.rs +++ b/rt/src/runtime/process.rs @@ -137,8 +137,8 @@ pub unsafe extern "system" fn inko_process_suspend( process: ProcessPointer, nanos: i64, ) -> *const Nil { - let timeout = Timeout::with_rc(Duration::from_nanos(nanos as _)); let state = &*state; + let timeout = Timeout::duration(state, Duration::from_nanos(nanos as _)); { let mut proc_state = process.state(); @@ -293,7 +293,7 @@ pub unsafe extern "system" fn inko_channel_receive_until( nanos: u64, ) -> InkoResult { let state = &(*state); - let deadline = Timeout::from_nanos_deadline(state, nanos); + let deadline = Timeout::until(nanos); loop { match (*channel).receive(process, Some(deadline.clone())) { diff --git a/rt/src/runtime/socket.rs b/rt/src/runtime/socket.rs index fd23b60e9..f07631768 100644 --- a/rt/src/runtime/socket.rs +++ b/rt/src/runtime/socket.rs @@ -31,7 +31,7 @@ fn blocking( // A deadline of -1 signals that we should wait indefinitely. if deadline >= 0 { - let time = Timeout::from_nanos_deadline(state, deadline as u64); + let time = Timeout::until(deadline as u64); proc_state.waiting_for_io(Some(time.clone())); state.timeout_worker.suspend(process, time); diff --git a/rt/src/scheduler/timeout_worker.rs b/rt/src/scheduler/timeout_worker.rs index 2bcc6fa13..b2f92d5d2 100644 --- a/rt/src/scheduler/timeout_worker.rs +++ b/rt/src/scheduler/timeout_worker.rs @@ -3,6 +3,7 @@ use crate::arc_without_weak::ArcWithoutWeak; use crate::process::ProcessPointer; use crate::scheduler::process::Scheduler; use crate::scheduler::timeouts::{Timeout, Timeouts}; +use crate::state::State; use std::cell::UnsafeCell; use std::collections::VecDeque; use std::mem::size_of; @@ -88,11 +89,11 @@ impl TimeoutWorker { self.expired.fetch_add(1, Ordering::AcqRel); } - pub(crate) fn run(&self, scheduler: &Scheduler) { - while scheduler.is_alive() { - let timeout = self.run_iteration(scheduler); + pub(crate) fn run(&self, state: &State) { + while state.scheduler.is_alive() { + let timeout = self.run_iteration(state); - self.sleep(scheduler, timeout); + self.sleep(&state.scheduler, timeout); } } @@ -107,12 +108,12 @@ impl TimeoutWorker { self.cvar.notify_one(); } - fn run_iteration(&self, scheduler: &Scheduler) -> Option { + fn run_iteration(&self, state: &State) -> Option { self.move_messages(); self.defragment_heap(); self.handle_pending_messages(); - if let Some(time) = self.reschedule_expired_processes(scheduler) { + if let Some(time) = self.reschedule_expired_processes(state) { if time.as_millis() < (MIN_SLEEP_TIME as u128) { Some(Duration::from_millis(MIN_SLEEP_TIME)) } else { @@ -141,15 +142,12 @@ impl TimeoutWorker { } } - fn reschedule_expired_processes( - &self, - scheduler: &Scheduler, - ) -> Option { + fn reschedule_expired_processes(&self, state: &State) -> Option { let inner = self.inner_mut(); let (expired, time_until_expiration) = - inner.timeouts.processes_to_reschedule(); + inner.timeouts.processes_to_reschedule(state); - scheduler.schedule_multiple(expired); + state.scheduler.schedule_multiple(expired); time_until_expiration } @@ -190,9 +188,8 @@ impl TimeoutWorker { mod tests { use super::*; use crate::process::Process; - use crate::scheduler::process::Scheduler; use crate::stack::Stack; - use crate::test::{empty_process_class, new_process}; + use crate::test::{empty_process_class, new_process, setup}; #[test] fn test_new() { @@ -204,11 +201,15 @@ mod tests { #[test] fn test_suspend() { + let state = setup(); let worker = TimeoutWorker::new(); let class = empty_process_class("A"); let process = new_process(*class); - worker.suspend(*process, Timeout::with_rc(Duration::from_secs(1))); + worker.suspend( + *process, + Timeout::duration(&state, Duration::from_secs(1)), + ); assert!(!worker.queue.lock().unwrap().is_empty()); } @@ -224,13 +225,13 @@ mod tests { #[test] fn test_run_with_fragmented_heap() { + let state = setup(); let class = empty_process_class("A"); let process = Process::alloc(*class, Stack::new(1024)); let worker = TimeoutWorker::new(); - let scheduler = Scheduler::new(1, 1, 1024); for time in &[10_u64, 5_u64] { - let timeout = Timeout::with_rc(Duration::from_secs(*time)); + let timeout = Timeout::duration(&state, Duration::from_secs(*time)); process.state().waiting_for_channel(Some(timeout.clone())); worker.suspend(process, timeout); @@ -242,7 +243,7 @@ mod tests { // loop. worker.move_messages(); worker.handle_pending_messages(); - worker.run_iteration(&scheduler); + worker.run_iteration(&state); assert_eq!(worker.inner().timeouts.len(), 1); assert_eq!(worker.expired.load(Ordering::Relaxed), 0); @@ -250,40 +251,41 @@ mod tests { #[test] fn test_run_with_message() { + let state = setup(); let class = empty_process_class("A"); let process = Process::alloc(*class, Stack::new(1024)); let worker = TimeoutWorker::new(); - let scheduler = Scheduler::new(1, 1, 1024); - let timeout = Timeout::with_rc(Duration::from_secs(10)); + let timeout = Timeout::duration(&state, Duration::from_secs(10)); process.state().waiting_for_channel(Some(timeout.clone())); worker.suspend(process, timeout); - worker.run_iteration(&scheduler); + worker.run_iteration(&state); assert_eq!(worker.inner().timeouts.len(), 1); } #[test] fn test_run_with_reschedule() { + let state = setup(); let class = empty_process_class("A"); let process = Process::alloc(*class, Stack::new(1024)); let worker = TimeoutWorker::new(); - let scheduler = Scheduler::new(1, 1, 1024); - let timeout = Timeout::with_rc(Duration::from_secs(0)); + let timeout = Timeout::duration(&state, Duration::from_secs(0)); process.state().waiting_for_channel(Some(timeout.clone())); worker.suspend(process, timeout); - worker.run_iteration(&scheduler); + worker.run_iteration(&state); assert_eq!(worker.inner().timeouts.len(), 0); } #[test] fn test_defragment_heap_without_fragmentation() { + let state = setup(); let class = empty_process_class("A"); let process = Process::alloc(*class, Stack::new(1024)); let worker = TimeoutWorker::new(); - let timeout = Timeout::with_rc(Duration::from_secs(1)); + let timeout = Timeout::duration(&state, Duration::from_secs(1)); process.state().waiting_for_channel(Some(timeout.clone())); worker.suspend(process, timeout); @@ -297,12 +299,13 @@ mod tests { #[test] fn test_defragment_heap_with_fragmentation() { + let state = setup(); let class = empty_process_class("A"); let process = Process::alloc(*class, Stack::new(1024)); let worker = TimeoutWorker::new(); for time in &[1_u64, 1_u64] { - let timeout = Timeout::with_rc(Duration::from_secs(*time)); + let timeout = Timeout::duration(&state, Duration::from_secs(*time)); process.state().waiting_for_channel(Some(timeout.clone())); worker.suspend(process, timeout); diff --git a/rt/src/scheduler/timeouts.rs b/rt/src/scheduler/timeouts.rs index 8cac32a25..9c8f7f1ff 100644 --- a/rt/src/scheduler/timeouts.rs +++ b/rt/src/scheduler/timeouts.rs @@ -9,47 +9,34 @@ use std::time::{Duration, Instant}; /// An process that should be resumed after a certain point in time. pub(crate) struct Timeout { - /// The time after which the timeout expires. - resume_after: Instant, + /// The time after which to resume, in nanoseconds since the runtime epoch. + /// + /// We use a `u64` here rather than an Instant for two reasons: + /// + /// 1. It only needs 8 bytes instead of 16 + /// 2. It makes some of the internal calculations easier due to the use of + /// our own epoch + resume_after: u64, } impl Timeout { - pub(crate) fn from_nanos_deadline( - state: &State, - nanos: u64, - ) -> ArcWithoutWeak { - let now = Instant::now(); - - // Our own monotonic clock is the time since the runtime epoch, which is - // roughly when the program first started running, so we can safely fit - // this in a Duration. - let dur = Duration::from_nanos(nanos); - - // This calculates the difference between our own monotonic clock, and - // the clock of `std::time::Instant`. We can then turn that into a - // `Duration` and add it to our `Instant` to get our deadline. - // - // Should the deadline ever be before the runtime epoch, we fall back to - // just the current time. In practise this shouldn't happen, but it's - // better to be safe than sorry. - let resume_after = dur - .checked_sub(now - state.start_time) - .map(|diff| now + diff) - .unwrap_or_else(|| now); - - ArcWithoutWeak::new(Self { resume_after }) + pub(crate) fn until(nanos: u64) -> ArcWithoutWeak { + ArcWithoutWeak::new(Timeout { resume_after: nanos }) } - pub(crate) fn new(suspend_for: Duration) -> Self { - Timeout { resume_after: Instant::now() + suspend_for } - } + pub(crate) fn duration( + state: &State, + duration: Duration, + ) -> ArcWithoutWeak { + let deadline = + (Instant::now() - state.start_time + duration).as_nanos() as u64; - pub(crate) fn with_rc(suspend_for: Duration) -> ArcWithoutWeak { - ArcWithoutWeak::new(Self::new(suspend_for)) + Timeout::until(deadline) } - pub(crate) fn remaining_time(&self) -> Option { - self.resume_after.checked_duration_since(Instant::now()) + pub(crate) fn remaining_time(&self, state: &State) -> Option { + (state.start_time + Duration::from_nanos(self.resume_after)) + .checked_duration_since(Instant::now()) } } @@ -150,19 +137,20 @@ impl Timeouts { pub(crate) fn processes_to_reschedule( &mut self, + state: &State, ) -> (Vec, Option) { let mut reschedule = Vec::new(); let mut time_until_expiration = None; while let Some(entry) = self.timeouts.pop() { - let mut state = entry.process.state(); + let mut proc_state = entry.process.state(); - if !state.has_same_timeout(&entry.timeout) { + if !proc_state.has_same_timeout(&entry.timeout) { continue; } - if let Some(duration) = entry.timeout.remaining_time() { - drop(state); + if let Some(duration) = entry.timeout.remaining_time(state) { + drop(proc_state); self.timeouts.push(entry); time_until_expiration = Some(duration); @@ -172,8 +160,8 @@ impl Timeouts { break; } - if state.try_reschedule_after_timeout().are_acquired() { - drop(state); + if proc_state.try_reschedule_after_timeout().are_acquired() { + drop(proc_state); reschedule.push(entry.process); } } @@ -198,47 +186,35 @@ impl Drop for Timeouts { mod tests { use super::*; use crate::stack::Stack; - use crate::test::{empty_process_class, new_process}; + use crate::test::{empty_process_class, new_process, setup}; + use std::mem::size_of; + use std::thread::sleep; mod timeout { use super::*; #[test] - fn test_new() { - let timeout = Timeout::new(Duration::from_secs(10)); - - // Due to the above code taking a tiny bit of time to run we can't - // assert that the "resume_after" field is _exactly_ 10 seconds from - // now. - let after = Instant::now() + Duration::from_secs(9); - - assert!(timeout.resume_after >= after); - } - - #[test] - fn test_with_rc() { - let timeout = Timeout::with_rc(Duration::from_secs(10)); - let after = Instant::now() + Duration::from_secs(9); - - assert!(timeout.resume_after >= after); + fn test_type_size() { + assert_eq!(size_of::(), 8); } #[test] fn test_remaining_time_with_remaining_time() { - let timeout = Timeout::new(Duration::from_secs(10)); - let remaining = timeout.remaining_time(); + let state = setup(); + let timeout = Timeout::duration(&state, Duration::from_secs(10)); + let remaining = timeout.remaining_time(&state); - assert!(remaining.is_some()); - assert!(remaining.unwrap() >= Duration::from_secs(9)); + assert!(remaining >= Some(Duration::from_secs(9))); } #[test] fn test_remaining_time_without_remaining_time() { - let timeout = Timeout { - resume_after: Instant::now() - Duration::from_secs(1), - }; + let state = setup(); + let timeout = Timeout::duration(&state, Duration::from_nanos(0)); + let remaining = timeout.remaining_time(&state); - assert!(timeout.remaining_time().is_none()); + sleep(Duration::from_millis(10)); + assert!(remaining.is_none()); } } @@ -249,16 +225,17 @@ mod tests { #[test] fn test_partial_cmp() { + let state = setup(); let class = empty_process_class("A"); let process = new_process(*class); let entry1 = TimeoutEntry::new( *process, - Timeout::with_rc(Duration::from_secs(1)), + Timeout::duration(&state, Duration::from_secs(1)), ); let entry2 = TimeoutEntry::new( *process, - Timeout::with_rc(Duration::from_secs(5)), + Timeout::duration(&state, Duration::from_secs(5)), ); assert_eq!( @@ -271,16 +248,17 @@ mod tests { #[test] fn test_cmp() { + let state = setup(); let class = empty_process_class("A"); let process = new_process(*class); let entry1 = TimeoutEntry::new( *process, - Timeout::with_rc(Duration::from_secs(1)), + Timeout::duration(&state, Duration::from_secs(1)), ); let entry2 = TimeoutEntry::new( *process, - Timeout::with_rc(Duration::from_secs(5)), + Timeout::duration(&state, Duration::from_secs(5)), ); assert_eq!(entry1.cmp(&entry2), cmp::Ordering::Greater); @@ -289,16 +267,17 @@ mod tests { #[test] fn test_eq() { + let state = setup(); let class = empty_process_class("A"); let process = new_process(*class); let entry1 = TimeoutEntry::new( *process, - Timeout::with_rc(Duration::from_secs(1)), + Timeout::duration(&state, Duration::from_secs(1)), ); let entry2 = TimeoutEntry::new( *process, - Timeout::with_rc(Duration::from_secs(5)), + Timeout::duration(&state, Duration::from_secs(5)), ); assert!(entry1 == entry1); @@ -311,10 +290,11 @@ mod tests { #[test] fn test_insert() { + let state = setup(); let class = empty_process_class("A"); let process = new_process(*class); let mut timeouts = Timeouts::new(); - let timeout = Timeout::with_rc(Duration::from_secs(10)); + let timeout = Timeout::duration(&state, Duration::from_secs(10)); timeouts.insert(*process, timeout); @@ -323,10 +303,11 @@ mod tests { #[test] fn test_len() { + let state = setup(); let class = empty_process_class("A"); let process = new_process(*class); let mut timeouts = Timeouts::new(); - let timeout = Timeout::with_rc(Duration::from_secs(10)); + let timeout = Timeout::duration(&state, Duration::from_secs(10)); timeouts.insert(*process, timeout); @@ -335,10 +316,11 @@ mod tests { #[test] fn test_remove_invalid_entries_with_valid_entries() { + let state = setup(); let class = empty_process_class("A"); let process = Process::alloc(*class, Stack::new(1024)); let mut timeouts = Timeouts::new(); - let timeout = Timeout::with_rc(Duration::from_secs(10)); + let timeout = Timeout::duration(&state, Duration::from_secs(10)); process.state().waiting_for_channel(Some(timeout.clone())); timeouts.insert(process, timeout); @@ -349,10 +331,11 @@ mod tests { #[test] fn test_remove_invalid_entries_with_invalid_entries() { + let state = setup(); let class = empty_process_class("A"); let process = new_process(*class); let mut timeouts = Timeouts::new(); - let timeout = Timeout::with_rc(Duration::from_secs(10)); + let timeout = Timeout::duration(&state, Duration::from_secs(10)); timeouts.insert(*process, timeout); @@ -362,14 +345,16 @@ mod tests { #[test] fn test_processes_to_reschedule_with_invalid_entries() { + let state = setup(); let class = empty_process_class("A"); let process = new_process(*class); let mut timeouts = Timeouts::new(); - let timeout = Timeout::with_rc(Duration::from_secs(10)); + let timeout = Timeout::duration(&state, Duration::from_secs(10)); timeouts.insert(*process, timeout); - let (reschedule, expiration) = timeouts.processes_to_reschedule(); + let (reschedule, expiration) = + timeouts.processes_to_reschedule(&state); assert!(reschedule.is_empty()); assert!(expiration.is_none()); @@ -377,15 +362,17 @@ mod tests { #[test] fn test_processes_to_reschedule_with_remaining_time() { + let state = setup(); let class = empty_process_class("A"); let process = Process::alloc(*class, Stack::new(1024)); let mut timeouts = Timeouts::new(); - let timeout = Timeout::with_rc(Duration::from_secs(10)); + let timeout = Timeout::duration(&state, Duration::from_secs(10)); process.state().waiting_for_channel(Some(timeout.clone())); timeouts.insert(process, timeout); - let (reschedule, expiration) = timeouts.processes_to_reschedule(); + let (reschedule, expiration) = + timeouts.processes_to_reschedule(&state); assert!(reschedule.is_empty()); assert!(expiration.is_some()); @@ -394,15 +381,17 @@ mod tests { #[test] fn test_processes_to_reschedule_with_entries_to_reschedule() { + let state = setup(); let class = empty_process_class("A"); let process = new_process(*class); let mut timeouts = Timeouts::new(); - let timeout = Timeout::with_rc(Duration::from_secs(0)); + let timeout = Timeout::duration(&state, Duration::from_secs(0)); process.state().waiting_for_channel(Some(timeout.clone())); timeouts.insert(*process, timeout); - let (reschedule, expiration) = timeouts.processes_to_reschedule(); + let (reschedule, expiration) = + timeouts.processes_to_reschedule(&state); assert_eq!(reschedule.len(), 1); assert!(expiration.is_none());