Skip to content

Commit

Permalink
Merge #528 #530
Browse files Browse the repository at this point in the history
528: Replace coco with crossbeam-deque r=nikomatsakis a=cuviper

These are the changes from @stjepang and @jeehoonkang, replacing and closing #480.  The minimum rustc is *slightly* increased from 1.12 to 1.13 for the transitive requirements.

530: Add examples to par_split_mut and par_chunks_mut r=nikomatsakis a=cuviper

Also add an odd tail to the `par_chunks` example.

cc #420
  • Loading branch information
bors[bot] committed Feb 14, 2018
3 parents 790c968 + 7239558 + 8c41708 commit d746e48
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 24 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ matrix:
fast_finish: true
include:
# NB: To help with CI delays, each `pull_request` is only tested on Linux,
# with 1.12 for compatibility and nightly+rayon_unstable for broad test
# with 1.13 for compatibility and nightly+rayon_unstable for broad test
# coverage. The bors bot counts as a `push` type, which will run it all.

- rust: 1.12.0
- rust: 1.13.0
os: linux
#if: everything!
before_script:
# rand 0.4.2 requires rust 1.22, and rand-0.3.22 requires rand-0.4 :/
# rand 0.4.2 requires rust 1.15, and rand-0.3.22 requires rand-0.4 :/
# manually hacking the lockfile due to the limitations of cargo#2773
- cargo generate-lockfile
- sed -i -e 's/"rand 0.[34].[0-9]\+/"rand 0.3.20/' Cargo.lock
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ just add:
use rayon::prelude::*;
```

Rayon currently requires `rustc 1.12.0` or greater.
Rayon currently requires `rustc 1.13.0` or greater.

## Contribution

Expand Down
2 changes: 1 addition & 1 deletion rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ categories = ["concurrency"]
[dependencies]
rand = ">= 0.3, < 0.5"
num_cpus = "1.2"
coco = "0.3.2"
crossbeam-deque = "0.2.0"
libc = "0.2.16"
lazy_static = "1"

Expand Down
2 changes: 2 additions & 0 deletions rayon-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ rayon-core aims to never, or almost never, have a breaking change to its API, be
Please see [Rayon Docs] for details about using Rayon.

[Rayon Docs]: https://docs.rs/rayon/

Rayon-core currently requires `rustc 1.13.0` or greater.
2 changes: 1 addition & 1 deletion rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::marker::PhantomData;
use std::str::FromStr;
use std::fmt;

extern crate coco;
extern crate crossbeam_deque;
#[macro_use]
extern crate lazy_static;
extern crate libc;
Expand Down
55 changes: 39 additions & 16 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ::{ExitHandler, PanicHandler, StartHandler, ThreadPoolBuilder, ThreadPoolInitError, ErrorKind};
use coco::deque::{self, Worker, Stealer};
use crossbeam_deque::{Deque, Steal, Stealer};
use job::{JobRef, StackJob};
#[cfg(rayon_unstable)]
use job::Job;
Expand Down Expand Up @@ -45,7 +45,7 @@ pub struct Registry {
}

struct RegistryState {
job_injector: Worker<JobRef>,
job_injector: Deque<JobRef>,
}

/// ////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -99,8 +99,12 @@ impl Registry {
let n_threads = builder.get_num_threads();
let breadth_first = builder.get_breadth_first();

let (inj_worker, inj_stealer) = deque::new();
let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads).map(|_| deque::new()).unzip();
let inj_worker = Deque::new();
let inj_stealer = inj_worker.stealer();
let workers: Vec<_> = (0..n_threads)
.map(|_| Deque::new())
.collect();
let stealers: Vec<_> = workers.iter().map(|d| d.stealer()).collect();

let registry = Arc::new(Registry {
thread_infos: stealers.into_iter()
Expand Down Expand Up @@ -304,11 +308,16 @@ impl Registry {
}

fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
let stolen = self.job_uninjector.steal();
if stolen.is_some() {
log!(UninjectedWork { worker: worker_index });
loop {
match self.job_uninjector.steal() {
Steal::Empty => return None,
Steal::Data(d) => {
log!(UninjectedWork { worker: worker_index });
return Some(d);
},
Steal::Retry => {},
}
}
stolen
}

/// If already in a worker-thread of this registry, just execute `op`.
Expand Down Expand Up @@ -407,7 +416,7 @@ pub struct RegistryId {
}

impl RegistryState {
pub fn new(job_injector: Worker<JobRef>) -> RegistryState {
pub fn new(job_injector: Deque<JobRef>) -> RegistryState {
RegistryState {
job_injector: job_injector,
}
Expand Down Expand Up @@ -443,7 +452,7 @@ impl ThreadInfo {
pub struct WorkerThread {
/// the "worker" half of our local deque
worker: Worker<JobRef>,
worker: Deque<JobRef>,

index: usize,

Expand Down Expand Up @@ -515,7 +524,13 @@ impl WorkerThread {
if !self.breadth_first {
self.worker.pop()
} else {
self.worker.steal()
loop {
match self.worker.steal() {
Steal::Empty => return None,
Steal::Data(d) => return Some(d),
Steal::Retry => {},
}
}
}
}

Expand Down Expand Up @@ -603,19 +618,27 @@ impl WorkerThread {
.filter(|&i| i != self.index)
.filter_map(|victim_index| {
let victim = &self.registry.thread_infos[victim_index];
let stolen = victim.stealer.steal();
if stolen.is_some() {
log!(StoleWork { worker: self.index, victim: victim_index });
loop {
match victim.stealer.steal() {
Steal::Empty => return None,
Steal::Data(d) => {
log!(StoleWork {
worker: self.index,
victim: victim_index
});
return Some(d);
},
Steal::Retry => {},
}
}
stolen
})
.next()
}
}

/// ////////////////////////////////////////////////////////////////////////
unsafe fn main_loop(worker: Worker<JobRef>,
unsafe fn main_loop(worker: Deque<JobRef>,
registry: Arc<Registry>,
index: usize,
breadth_first: bool) {
Expand Down
24 changes: 22 additions & 2 deletions src/slice/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ pub trait ParallelSlice<T: Sync> {
///
/// ```
/// use rayon::prelude::*;
/// let windows: Vec<_> = [1, 2, 3, 4].par_chunks(2).collect();
/// assert_eq!(vec![[1, 2], [3, 4]], windows);
/// let chunks: Vec<_> = [1, 2, 3, 4, 5].par_chunks(2).collect();
/// assert_eq!(chunks, vec![&[1, 2][..], &[3, 4], &[5]]);
/// ```
fn par_chunks(&self, chunk_size: usize) -> Chunks<T> {
assert!(chunk_size != 0, "chunk_size must not be zero");
Expand All @@ -101,6 +101,16 @@ pub trait ParallelSliceMut<T: Send> {

/// Returns a parallel iterator over mutable subslices separated by
/// elements that match the separator.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
/// let mut array = [1, 2, 3, 0, 2, 4, 8, 0, 3, 6, 9];
/// array.par_split_mut(|i| *i == 0)
/// .for_each(|slice| slice.reverse());
/// assert_eq!(array, [3, 2, 1, 0, 8, 4, 2, 0, 9, 6, 3]);
/// ```
fn par_split_mut<P>(&mut self, separator: P) -> SplitMut<T, P>
where P: Fn(&T) -> bool + Sync + Send
{
Expand All @@ -112,6 +122,16 @@ pub trait ParallelSliceMut<T: Send> {

/// Returns a parallel iterator over at most `size` elements of
/// `self` at a time. The chunks are mutable and do not overlap.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
/// let mut array = [1, 2, 3, 4, 5];
/// array.par_chunks_mut(2)
/// .for_each(|slice| slice.reverse());
/// assert_eq!(array, [2, 1, 4, 3, 5]);
/// ```
fn par_chunks_mut(&mut self, chunk_size: usize) -> ChunksMut<T> {
assert!(chunk_size != 0, "chunk_size must not be zero");
ChunksMut {
Expand Down

0 comments on commit d746e48

Please sign in to comment.