Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace coco with crossbeam-deque #480

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ keywords = ["parallel", "thread", "concurrency", "join", "performance"]
categories = ["concurrency"]

[dependencies]
rand = "0.3"
num_cpus = "1.2"
coco = "0.3.2"
libc = "0.2.16"
crossbeam-deque = "0.1.0"
lazy_static = "0.2.2"
libc = "0.2.16"
num_cpus = "1.2"
rand = "0.3"

[dev-dependencies]
2 changes: 1 addition & 1 deletion rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::marker::PhantomData;
use std::str::FromStr;
use std::fmt;

extern crate coco;
extern crate crossbeam_deque;
#[macro_use]
extern crate lazy_static;
extern crate libc;
Expand Down
44 changes: 34 additions & 10 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ::{Configuration, ExitHandler, PanicHandler, StartHandler};
use coco::deque::{self, Worker, Stealer};
use crossbeam_deque::{Deque, Steal, Stealer};
use job::{JobRef, StackJob};
#[cfg(rayon_unstable)]
use job::Job;
Expand Down Expand Up @@ -63,7 +63,7 @@ pub struct Registry {
}

struct RegistryState {
job_injector: Worker<JobRef>,
job_injector: Deque<JobRef>,
}

/// ////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -114,11 +114,17 @@ impl<'a> Drop for Terminator<'a> {

impl Registry {
pub fn new(mut configuration: Configuration) -> Result<Arc<Registry>, Box<Error>> {
const MIN_DEQUE_CAPACITY: usize = 1000;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This number is made up - I just chose one that seemed reasonable.


let n_threads = configuration.get_num_threads();
let breadth_first = configuration.get_breadth_first();

let (inj_worker, inj_stealer) = deque::new();
let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads).map(|_| deque::new()).unzip();
let inj_worker = Deque::with_min_capacity(MIN_DEQUE_CAPACITY);
let inj_stealer = inj_worker.stealer();
let workers: Vec<_> = (0..n_threads)
.map(|_| Deque::with_min_capacity(MIN_DEQUE_CAPACITY))
.collect();
let stealers: Vec<_> = workers.iter().map(|d| d.stealer()).collect();

let registry = Arc::new(Registry {
thread_infos: stealers.into_iter()
Expand Down Expand Up @@ -320,7 +326,13 @@ impl Registry {
}

fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
let stolen = self.job_uninjector.steal();
let stolen = loop {
match self.job_uninjector.steal() {
Steal::Empty => break None,
Steal::Data(d) => break Some(d),
Steal::Retry => {},
}
};
if stolen.is_some() {
log!(UninjectedWork { worker: worker_index });
}
Expand Down Expand Up @@ -423,7 +435,7 @@ pub struct RegistryId {
}

impl RegistryState {
pub fn new(job_injector: Worker<JobRef>) -> RegistryState {
pub fn new(job_injector: Deque<JobRef>) -> RegistryState {
RegistryState {
job_injector: job_injector,
}
Expand Down Expand Up @@ -459,7 +471,7 @@ impl ThreadInfo {

pub struct WorkerThread {
/// the "worker" half of our local deque
worker: Worker<JobRef>,
worker: Deque<JobRef>,

index: usize,

Expand Down Expand Up @@ -531,7 +543,13 @@ impl WorkerThread {
if !self.breadth_first {
self.worker.pop()
} else {
self.worker.steal()
loop {
match self.worker.steal() {
Steal::Empty => break None,
Steal::Data(d) => break Some(d),
Steal::Retry => {},
}
}
}
}

Expand Down Expand Up @@ -619,7 +637,13 @@ impl WorkerThread {
.filter(|&i| i != self.index)
.filter_map(|victim_index| {
let victim = &self.registry.thread_infos[victim_index];
let stolen = victim.stealer.steal();
let stolen = loop {
match victim.stealer.steal() {
Steal::Empty => break None,
Steal::Data(d) => break Some(d),
Steal::Retry => {},
}
};
if stolen.is_some() {
log!(StoleWork { worker: self.index, victim: victim_index });
}
Expand All @@ -631,7 +655,7 @@ impl WorkerThread {

/// ////////////////////////////////////////////////////////////////////////

unsafe fn main_loop(worker: Worker<JobRef>,
unsafe fn main_loop(worker: Deque<JobRef>,
registry: Arc<Registry>,
index: usize,
breadth_first: bool) {
Expand Down