Skip to content

Commit

Permalink
Rewrite the process scheduler from the ground up
Browse files Browse the repository at this point in the history
This new scheduler still uses work stealing, just using a different and
much better implementation compared to the old scheduler.

Instead of using crossbeam-deque we now use crossbeam-queue's ArrayQueue
type for local queues. crossbeam's Chase-lev deque is generally OK, but
uses epoch based garbage collection and isn't as fast as it could/should
be. Tokio reached the same conclusions back in 2019 [1]. The ArrayQueue
type is a simple bounded MPMC queue, and provides an API that's easier
to work with. When the queue is full, work overflows into the global
queue.

The global queue is just a synchronised Vec. While a mutex may come with
greater overhead, the scheduler tries to avoid using it until truly
necessary, meaning the overhead isn't that big of a deal. A benefit of
using a Vec is that we can quickly steal multiple jobs from this queue,
instead of having to pop values one by one. This reduces the time the
lock stays open, reducing contention.

For putting threads to sleep we use crossbeam-utils' Parker/Unparker
type. These types allow parking/unparking of threads without the need
for condition variables and locks. Crucially, if you unpark a thread
before parking it, the thread doesn't park. This means the
implementation is less likely to result in threads sleeping when they
should instead be performing work.

For stealing work each thread starts stealing from the thread that comes
after it, wrapping around where needed. We experimented with stealing
from a randomly chosen thread but didn't find any significant
performance benefits to doing so. When given the choice between
questionable performance benefits and simplicity, we prefer simplicity.

Each thread also has a priority slot. This slot stores a single process
to run, and other threads can't steal from this slot. This is useful
when rescheduling processes that we can to run as soon as possible.
Currently this is only used when sending a message to a process that
isn't performing work, and we want to await the result immediately.
Since the sending process needs to be suspended and the receiving
process needs to be woken up, we can schedule it onto the current thread
using the priority slot.

When running Inko's test suite and measuring the total execution time,
we found the new scheduler to be 25-30% faster compared to the old
scheduler. We did experiment with alternatives to work stealing, such as
work pushing (where busy threads push excess work to idle threads), but
these alternatives proved much slower than even our old work stealing
scheduler.

This fixes https://gitlab.com/inko-lang/inko/-/issues/273.

[1]: https://tokio.rs/blog/2019-10-scheduler

Changelog: performance
  • Loading branch information
yorickpeterse committed Sep 18, 2022
1 parent 1a51d97 commit 4cf9101
Show file tree
Hide file tree
Showing 16 changed files with 597 additions and 1,203 deletions.
38 changes: 4 additions & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 52 additions & 5 deletions docs/source/internals/vm.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,34 @@ string.

Processes are scheduled onto a fixed-size pool of OS threads, with the default
size being equal to the number of CPU cores. This can be changed by setting the
environment variable `INKO_PROCESS_THREADS` to a value between 0 and 65 535.
environment variable `INKO_PROCESS_THREADS` to a value between 1 and 65 535.

The main thread is reserved for the main process, and the value set in
`INKO_PROCESS_THREADS` specifies the number of _additional_ threads to spawn.
This means that if the value is set to 16, you'll have 16 process threads _in
addition to_ the main thread.
### The main thread

The main OS thread isn't used for anything special, instead it waits for the
process threads to finish. This means that C libraries that require the use of
the main thread won't work with Inko. Few libraries have such requirements, most
of which are GUI libraries, and these probably won't work with Inko anyway due
to their heavy use of callbacks, which Inko doesn't support.

### Load balancing

Work is distributed using a work stealing algorithm. Each thread has a bounded
local queue that they produce work on, and other threads can steal work from
this queue. Threads also have a single slot for a process to run at a higher
priority. This is used in certain cases where we want to reduce latency.

When new work is produced but the queue is full, the work is instead
pushed onto a global queue all threads have access to. Threads perform work in
these steps:

1. Run the process in the priority slot
1. Run all processes in the local queue
1. Steal processes from another thread
1. Steal processes from the global queue
1. Go to sleep until new work is pushed onto the global queue

### Reductions

Processes maintain a reduction counter, starting at a pre-determined value.
Certain operations reduce this counter. When the counter reaches zero it's
Expand All @@ -49,6 +71,31 @@ threads for blocking operations, but we removed this to simplify the VM. An
alternative and better approach is discussed in [this
issue](https://gitlab.com/inko-lang/inko/-/issues/247).

## Timeouts

Processes can suspend themselves with a timeout, or await a future for up to a
certain amount of time. A separate thread called the "timeout worker" handles
managing such processes. The timeout worker uses a binary heap for storing
processes along with their timeouts, sorting them such that those with the
shortest timeout are processed first.

When a process suspends itself with a timeout, it stores itself in a queue owned
by the timeout worker.

The timeout worker performs its work in these steps:

1. Move messages from the synchronised queue into an unsynchronised local FIFO
queue
1. Defragment the heap by removing entries that are no longer valid (e.g. a
process got rescheduled before its timeout expired)
1. Process any new entries to add into the heap
1. Sleep until the shortest timeout expires, taking into account time already
spent sleeping for the given timeout
1. Repeat this cycle until we shut down

If the timeout worker is asleep and a new entry is added to the synchronised
queue, the worker is woken up and the cycle starts anew.

## Memory management

The VM uses the system allocator for allocating memory. You can also build the
Expand Down
4 changes: 2 additions & 2 deletions vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ float-cmp = "^0.9"
dirs-next = "^1.0"
libloading = "^0.7"
libffi = "^3.0"
crossbeam-deque = "^0.8"
crossbeam-channel = "^0.5"
crossbeam-utils = "^0.8"
crossbeam-queue = "^0.3"
libc = "^0.2"
ahash = "^0.8"
rand = "^0.8"
rand = { version = "^0.8", features = ["default", "small_rng"] }
polling = "^2.0"
unicode-segmentation = "^1.8"
bytecode = { path = "../bytecode" }
Expand Down
5 changes: 0 additions & 5 deletions vm/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ pub struct Config {
/// The number of process threads to run.
pub process_threads: u16,

/// The number of threads to use for parsing bytecode images.
pub bytecode_threads: u16,

/// The number of reductions a process can perform before being suspended.
pub reductions: u16,
}
Expand All @@ -35,7 +32,6 @@ impl Config {

Config {
process_threads: cpu_count as u16,
bytecode_threads: cpu_count as u16,
reductions: DEFAULT_REDUCTIONS,
}
}
Expand All @@ -44,7 +40,6 @@ impl Config {
let mut config = Config::new();

set_from_env!(config, process_threads, "PROCESS_THREADS", u16);
set_from_env!(config, bytecode_threads, "BYTECODE_THREADS", u16);
set_from_env!(config, reductions, "REDUCTIONS", u16);

config
Expand Down
6 changes: 3 additions & 3 deletions vm/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ mod tests {

#[test]
fn test_function_from_pointers() {
let state = setup();
let (state, _) = setup();
let name = InkoString::alloc(
state.permanent_space.string_class(),
LIBM.to_string(),
Expand All @@ -673,7 +673,7 @@ mod tests {
#[test]
fn test_function_call() {
let lib = Library::open(&[LIBM]).unwrap();
let state = setup();
let (state, _) = setup();
let arg = Float::alloc(state.permanent_space.float_class(), 3.15);

unsafe {
Expand All @@ -690,7 +690,7 @@ mod tests {

#[test]
fn test_pointer_read_and_write() {
let state = setup();
let (state, _) = setup();

unsafe {
let ptr = Pointer::new(calloc(1, 3));
Expand Down
50 changes: 7 additions & 43 deletions vm/src/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use bytecode::{
Instruction, Opcode, CONST_FLOAT, CONST_INTEGER, CONST_STRING,
SIGNATURE_BYTES, VERSION,
};
use crossbeam_channel::bounded;
use crossbeam_utils::thread::scope;
use std::f64;
use std::fs::File;
use std::io::{BufReader, Read};
Expand Down Expand Up @@ -114,12 +112,7 @@ impl Image {

// Now we can load the bytecode for all modules, including the entry
// module. The order in which the modules are returned is unspecified.
read_modules(
config.bytecode_threads as usize,
modules as usize,
&space,
stream,
)?;
read_modules(modules as usize, &space, stream)?;

Ok(Image {
entry_class: unsafe { space.get_class(entry_class) },
Expand Down Expand Up @@ -156,47 +149,18 @@ fn read_builtin_method_counts<R: Read>(
}

fn read_modules<R: Read>(
concurrency: usize,
num_modules: usize,
space: &PermanentSpace,
stream: &mut R,
) -> Result<(), String> {
let (in_sender, in_receiver) = bounded::<Vec<u8>>(num_modules);
for _ in 0..num_modules {
let amount = read_u64(stream)? as usize;
let chunk = read_vec!(stream, amount);

scope(|s| {
let mut handles = Vec::with_capacity(concurrency);

for _ in 0..concurrency {
let handle = s.spawn(|_| -> Result<(), String> {
while let Ok(chunk) = in_receiver.recv() {
read_module(space, &mut &chunk[..])?;
}

Ok(())
});

handles.push(handle);
}

for _ in 0..num_modules {
let amount = read_u64(stream)? as usize;
let chunk = read_vec!(stream, amount);

in_sender.send(chunk).expect("Failed to send a chunk of bytecode");
}

// We need to drop the sender before joining. If we don't, parser
// threads won't terminate until the end of this scope. But since we are
// joining those threads, we'd never reach that point.
drop(in_sender);

for handle in handles {
handle.join().map_err(|e| format!("{:?}", e))??;
}
read_module(space, &mut &chunk[..])?;
}

Ok(())
})
.map_err(|e| format!("{:?}", e))?
Ok(())
}

fn read_module<R: Read>(
Expand Down
34 changes: 23 additions & 11 deletions vm/src/instructions/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::process::{
Future, FutureState, Process, ProcessPointer, RescheduleRights,
TaskPointer, Write, WriteResult,
};
use crate::scheduler::process::Thread;
use crate::scheduler::timeouts::Timeout;
use crate::state::State;
use std::time::Duration;
Expand All @@ -22,30 +23,40 @@ pub(crate) fn allocate(state: &State, class_idx: u32) -> Pointer {
#[inline(always)]
pub(crate) fn send_message(
state: &State,
thread: &mut Thread,
mut task: TaskPointer,
sender: ProcessPointer,
receiver_ptr: Pointer,
method: u16,
wait: bool,
) {
) -> bool {
let mut receiver = unsafe { ProcessPointer::from_pointer(receiver_ptr) };
let args = task.take_arguments();

match receiver.send_message(MethodIndex::new(method), sender, args, wait) {
RescheduleRights::AcquiredWithTimeout => {
state.timeout_worker.increase_expired_timeouts();
state.scheduler.schedule(receiver);
}
RescheduleRights::Acquired => {
state.scheduler.schedule(receiver);
}
_ => {}
RescheduleRights::Acquired => {}
_ => return false,
}

if wait {
// When awaiting the result immediately we want to keep latency as small
// as possible. To achieve this we reschedule the receiver (if allowed)
// onto the current worker with a high priority.
thread.schedule_priority(receiver);
true
} else {
thread.schedule(receiver);
false
}
}

#[inline(always)]
pub(crate) fn send_async_message(
state: &State,
thread: &mut Thread,
mut task: TaskPointer,
receiver_ptr: Pointer,
method: u16,
Expand All @@ -60,10 +71,10 @@ pub(crate) fn send_async_message(
{
RescheduleRights::AcquiredWithTimeout => {
state.timeout_worker.increase_expired_timeouts();
state.scheduler.schedule(receiver);
thread.schedule(receiver);
}
RescheduleRights::Acquired => {
state.scheduler.schedule(receiver);
thread.schedule(receiver);
}
_ => {}
}
Expand Down Expand Up @@ -101,6 +112,7 @@ pub(crate) fn set_field(process: Pointer, index: u16, value: Pointer) {
#[inline(always)]
pub(crate) fn write_result(
state: &State,
thread: &mut Thread,
task: TaskPointer,
result: Pointer,
thrown: bool,
Expand All @@ -114,18 +126,18 @@ pub(crate) fn write_result(
rec.set_return_value(result);
}

state.scheduler.schedule(rec);
thread.schedule(rec);
Pointer::true_singleton()
}
Write::Future(fut) => match fut.write(result, thrown) {
WriteResult::Continue => Pointer::true_singleton(),
WriteResult::Reschedule(consumer) => {
state.scheduler.schedule(consumer);
thread.schedule(consumer);
Pointer::true_singleton()
}
WriteResult::RescheduleWithTimeout(consumer) => {
state.timeout_worker.increase_expired_timeouts();
state.scheduler.schedule(consumer);
thread.schedule(consumer);
Pointer::true_singleton()
}
WriteResult::Discard => Pointer::false_singleton(),
Expand Down
Loading

0 comments on commit 4cf9101

Please sign in to comment.