Skip to content

Commit

Permalink
Use a u64 for timeouts
Browse files Browse the repository at this point in the history
Rust's Instant type is 16 bytes, using a u64 we can reduce this to 8
bytes. In addition, a u64 simplifies some of the internal timeout
calculations.

Changelog: performance
  • Loading branch information
yorickpeterse committed May 23, 2023
1 parent b3f9e42 commit e611ace
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 148 deletions.
85 changes: 48 additions & 37 deletions rt/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ impl Channel {
mod tests {
use super::*;
use crate::mem::tagged_int;
use crate::test::{empty_class, empty_process_class, OwnedProcess};
use crate::test::{empty_class, empty_process_class, setup, OwnedProcess};
use std::time::Duration;

macro_rules! offset_of {
Expand Down Expand Up @@ -1077,62 +1077,65 @@ mod tests {

#[test]
fn test_process_state_has_same_timeout() {
let mut state = ProcessState::new();
let timeout = Timeout::with_rc(Duration::from_secs(0));
let state = setup();
let mut proc_state = ProcessState::new();
let timeout = Timeout::duration(&state, Duration::from_secs(0));

assert!(!state.has_same_timeout(&timeout));
assert!(!proc_state.has_same_timeout(&timeout));

state.timeout = Some(timeout.clone());
proc_state.timeout = Some(timeout.clone());

assert!(state.has_same_timeout(&timeout));
assert!(proc_state.has_same_timeout(&timeout));
}

#[test]
fn test_process_state_try_reschedule_after_timeout() {
let mut state = ProcessState::new();
let state = setup();
let mut proc_state = ProcessState::new();

assert_eq!(
state.try_reschedule_after_timeout(),
proc_state.try_reschedule_after_timeout(),
RescheduleRights::Failed
);

state.waiting_for_channel(None);
proc_state.waiting_for_channel(None);

assert_eq!(
state.try_reschedule_after_timeout(),
proc_state.try_reschedule_after_timeout(),
RescheduleRights::Acquired
);

assert!(!state.status.is_waiting_for_channel());
assert!(!state.status.is_waiting());
assert!(!proc_state.status.is_waiting_for_channel());
assert!(!proc_state.status.is_waiting());

let timeout = Timeout::with_rc(Duration::from_secs(0));
let timeout = Timeout::duration(&state, Duration::from_secs(0));

state.waiting_for_channel(Some(timeout));
proc_state.waiting_for_channel(Some(timeout));

assert_eq!(
state.try_reschedule_after_timeout(),
proc_state.try_reschedule_after_timeout(),
RescheduleRights::AcquiredWithTimeout
);

assert!(!state.status.is_waiting_for_channel());
assert!(!state.status.is_waiting());
assert!(!proc_state.status.is_waiting_for_channel());
assert!(!proc_state.status.is_waiting());
}

#[test]
fn test_process_state_waiting_for_channel() {
let mut state = ProcessState::new();
let timeout = Timeout::with_rc(Duration::from_secs(0));
let state = setup();
let mut proc_state = ProcessState::new();
let timeout = Timeout::duration(&state, Duration::from_secs(0));

state.waiting_for_channel(None);
proc_state.waiting_for_channel(None);

assert!(state.status.is_waiting_for_channel());
assert!(state.timeout.is_none());
assert!(proc_state.status.is_waiting_for_channel());
assert!(proc_state.timeout.is_none());

state.waiting_for_channel(Some(timeout));
proc_state.waiting_for_channel(Some(timeout));

assert!(state.status.is_waiting_for_channel());
assert!(state.timeout.is_some());
assert!(proc_state.status.is_waiting_for_channel());
assert!(proc_state.timeout.is_some());
}

#[test]
Expand All @@ -1155,28 +1158,30 @@ mod tests {

#[test]
fn test_process_state_try_reschedule_for_channel() {
let mut state = ProcessState::new();
let state = setup();
let mut proc_state = ProcessState::new();

assert_eq!(
state.try_reschedule_for_channel(),
proc_state.try_reschedule_for_channel(),
RescheduleRights::Failed
);

state.status.set_waiting_for_channel(true);
proc_state.status.set_waiting_for_channel(true);
assert_eq!(
state.try_reschedule_for_channel(),
proc_state.try_reschedule_for_channel(),
RescheduleRights::Acquired
);
assert!(!state.status.is_waiting_for_channel());
assert!(!proc_state.status.is_waiting_for_channel());

state.status.set_waiting_for_channel(true);
state.timeout = Some(Timeout::with_rc(Duration::from_secs(0)));
proc_state.status.set_waiting_for_channel(true);
proc_state.timeout =
Some(Timeout::duration(&state, Duration::from_secs(0)));

assert_eq!(
state.try_reschedule_for_channel(),
proc_state.try_reschedule_for_channel(),
RescheduleRights::AcquiredWithTimeout
);
assert!(!state.status.is_waiting_for_channel());
assert!(!proc_state.status.is_waiting_for_channel());
}

#[test]
Expand Down Expand Up @@ -1211,10 +1216,11 @@ mod tests {

#[test]
fn test_process_state_suspend() {
let state = setup();
let class = empty_process_class("A");
let stack = Stack::new(32);
let process = OwnedProcess::new(Process::alloc(*class, stack));
let timeout = Timeout::with_rc(Duration::from_secs(0));
let timeout = Timeout::duration(&state, Duration::from_secs(0));

process.state().suspend(timeout);

Expand All @@ -1224,10 +1230,11 @@ mod tests {

#[test]
fn test_process_timeout_expired() {
let state = setup();
let class = empty_process_class("A");
let stack = Stack::new(32);
let process = OwnedProcess::new(Process::alloc(*class, stack));
let timeout = Timeout::with_rc(Duration::from_secs(0));
let timeout = Timeout::duration(&state, Duration::from_secs(0));

assert!(!process.timeout_expired());

Expand Down Expand Up @@ -1325,6 +1332,7 @@ mod tests {

#[test]
fn test_channel_send_with_waiting_with_timeout() {
let state = setup();
let process_class = empty_process_class("A");
let process =
OwnedProcess::new(Process::alloc(*process_class, Stack::new(32)));
Expand All @@ -1333,7 +1341,10 @@ mod tests {
let chan = unsafe { &(*chan_ptr) };
let msg = tagged_int(42);

chan.receive(*process, Some(Timeout::with_rc(Duration::from_secs(0))));
chan.receive(
*process,
Some(Timeout::duration(&state, Duration::from_secs(0))),
);

assert_eq!(
chan.send(*process, msg as _),
Expand Down
2 changes: 1 addition & 1 deletion rt/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Runtime {
.name("timeout".to_string())
.spawn(move || {
pin_thread_to_core(0);
state.timeout_worker.run(&state.scheduler)
state.timeout_worker.run(&state)
})
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions rt/src/runtime/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ pub unsafe extern "system" fn inko_process_suspend(
process: ProcessPointer,
nanos: i64,
) -> *const Nil {
let timeout = Timeout::with_rc(Duration::from_nanos(nanos as _));
let state = &*state;
let timeout = Timeout::duration(state, Duration::from_nanos(nanos as _));

{
let mut proc_state = process.state();
Expand Down Expand Up @@ -293,7 +293,7 @@ pub unsafe extern "system" fn inko_channel_receive_until(
nanos: u64,
) -> InkoResult {
let state = &(*state);
let deadline = Timeout::from_nanos_deadline(state, nanos);
let deadline = Timeout::until(nanos);

loop {
match (*channel).receive(process, Some(deadline.clone())) {
Expand Down
2 changes: 1 addition & 1 deletion rt/src/runtime/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn blocking<T>(

// A deadline of -1 signals that we should wait indefinitely.
if deadline >= 0 {
let time = Timeout::from_nanos_deadline(state, deadline as u64);
let time = Timeout::until(deadline as u64);

proc_state.waiting_for_io(Some(time.clone()));
state.timeout_worker.suspend(process, time);
Expand Down
55 changes: 29 additions & 26 deletions rt/src/scheduler/timeout_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::arc_without_weak::ArcWithoutWeak;
use crate::process::ProcessPointer;
use crate::scheduler::process::Scheduler;
use crate::scheduler::timeouts::{Timeout, Timeouts};
use crate::state::State;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::mem::size_of;
Expand Down Expand Up @@ -88,11 +89,11 @@ impl TimeoutWorker {
self.expired.fetch_add(1, Ordering::AcqRel);
}

pub(crate) fn run(&self, scheduler: &Scheduler) {
while scheduler.is_alive() {
let timeout = self.run_iteration(scheduler);
pub(crate) fn run(&self, state: &State) {
while state.scheduler.is_alive() {
let timeout = self.run_iteration(state);

self.sleep(scheduler, timeout);
self.sleep(&state.scheduler, timeout);
}
}

Expand All @@ -107,12 +108,12 @@ impl TimeoutWorker {
self.cvar.notify_one();
}

fn run_iteration(&self, scheduler: &Scheduler) -> Option<Duration> {
fn run_iteration(&self, state: &State) -> Option<Duration> {
self.move_messages();
self.defragment_heap();
self.handle_pending_messages();

if let Some(time) = self.reschedule_expired_processes(scheduler) {
if let Some(time) = self.reschedule_expired_processes(state) {
if time.as_millis() < (MIN_SLEEP_TIME as u128) {
Some(Duration::from_millis(MIN_SLEEP_TIME))
} else {
Expand Down Expand Up @@ -141,15 +142,12 @@ impl TimeoutWorker {
}
}

fn reschedule_expired_processes(
&self,
scheduler: &Scheduler,
) -> Option<Duration> {
fn reschedule_expired_processes(&self, state: &State) -> Option<Duration> {
let inner = self.inner_mut();
let (expired, time_until_expiration) =
inner.timeouts.processes_to_reschedule();
inner.timeouts.processes_to_reschedule(state);

scheduler.schedule_multiple(expired);
state.scheduler.schedule_multiple(expired);
time_until_expiration
}

Expand Down Expand Up @@ -190,9 +188,8 @@ impl TimeoutWorker {
mod tests {
use super::*;
use crate::process::Process;
use crate::scheduler::process::Scheduler;
use crate::stack::Stack;
use crate::test::{empty_process_class, new_process};
use crate::test::{empty_process_class, new_process, setup};

#[test]
fn test_new() {
Expand All @@ -204,11 +201,15 @@ mod tests {

#[test]
fn test_suspend() {
let state = setup();
let worker = TimeoutWorker::new();
let class = empty_process_class("A");
let process = new_process(*class);

worker.suspend(*process, Timeout::with_rc(Duration::from_secs(1)));
worker.suspend(
*process,
Timeout::duration(&state, Duration::from_secs(1)),
);

assert!(!worker.queue.lock().unwrap().is_empty());
}
Expand All @@ -224,13 +225,13 @@ mod tests {

#[test]
fn test_run_with_fragmented_heap() {
let state = setup();
let class = empty_process_class("A");
let process = Process::alloc(*class, Stack::new(1024));
let worker = TimeoutWorker::new();
let scheduler = Scheduler::new(1, 1, 1024);

for time in &[10_u64, 5_u64] {
let timeout = Timeout::with_rc(Duration::from_secs(*time));
let timeout = Timeout::duration(&state, Duration::from_secs(*time));

process.state().waiting_for_channel(Some(timeout.clone()));
worker.suspend(process, timeout);
Expand All @@ -242,48 +243,49 @@ mod tests {
// loop.
worker.move_messages();
worker.handle_pending_messages();
worker.run_iteration(&scheduler);
worker.run_iteration(&state);

assert_eq!(worker.inner().timeouts.len(), 1);
assert_eq!(worker.expired.load(Ordering::Relaxed), 0);
}

#[test]
fn test_run_with_message() {
let state = setup();
let class = empty_process_class("A");
let process = Process::alloc(*class, Stack::new(1024));
let worker = TimeoutWorker::new();
let scheduler = Scheduler::new(1, 1, 1024);
let timeout = Timeout::with_rc(Duration::from_secs(10));
let timeout = Timeout::duration(&state, Duration::from_secs(10));

process.state().waiting_for_channel(Some(timeout.clone()));
worker.suspend(process, timeout);
worker.run_iteration(&scheduler);
worker.run_iteration(&state);

assert_eq!(worker.inner().timeouts.len(), 1);
}

#[test]
fn test_run_with_reschedule() {
let state = setup();
let class = empty_process_class("A");
let process = Process::alloc(*class, Stack::new(1024));
let worker = TimeoutWorker::new();
let scheduler = Scheduler::new(1, 1, 1024);
let timeout = Timeout::with_rc(Duration::from_secs(0));
let timeout = Timeout::duration(&state, Duration::from_secs(0));

process.state().waiting_for_channel(Some(timeout.clone()));
worker.suspend(process, timeout);
worker.run_iteration(&scheduler);
worker.run_iteration(&state);

assert_eq!(worker.inner().timeouts.len(), 0);
}

#[test]
fn test_defragment_heap_without_fragmentation() {
let state = setup();
let class = empty_process_class("A");
let process = Process::alloc(*class, Stack::new(1024));
let worker = TimeoutWorker::new();
let timeout = Timeout::with_rc(Duration::from_secs(1));
let timeout = Timeout::duration(&state, Duration::from_secs(1));

process.state().waiting_for_channel(Some(timeout.clone()));
worker.suspend(process, timeout);
Expand All @@ -297,12 +299,13 @@ mod tests {

#[test]
fn test_defragment_heap_with_fragmentation() {
let state = setup();
let class = empty_process_class("A");
let process = Process::alloc(*class, Stack::new(1024));
let worker = TimeoutWorker::new();

for time in &[1_u64, 1_u64] {
let timeout = Timeout::with_rc(Duration::from_secs(*time));
let timeout = Timeout::duration(&state, Duration::from_secs(*time));

process.state().waiting_for_channel(Some(timeout.clone()));
worker.suspend(process, timeout);
Expand Down
Loading

0 comments on commit e611ace

Please sign in to comment.