Skip to content

Commit

Permalink
Introduce SmallVec for small allocations (#581)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Sep 6, 2024
1 parent abaded2 commit 0da150a
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 28 deletions.
1 change: 1 addition & 0 deletions timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ timely_logging = { path = "../logging", version = "0.12" }
timely_communication = { path = "../communication", version = "0.12", default-features = false }
timely_container = { path = "../container", version = "0.12" }
crossbeam-channel = "0.5.0"
smallvec = { version = "1.13.2", features = ["serde", "const_generics"] }

[dev-dependencies]
# timely_sort="0.1.6"
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/capture/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<S: Scope, C: Container> Capture<S::Timestamp, C> for StreamCore<S, C> {
if !progress.frontiers[0].is_empty() {
// transmit any frontier progress.
let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new());
event_pusher.push(Event::Progress(to_send.into_inner()));
event_pusher.push(Event::Progress(to_send.into_inner().to_vec()));
}

use crate::communication::message::RefOrMut;
Expand Down
2 changes: 1 addition & 1 deletion timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
self.to_push = Some(Message::from_typed((
self.source,
self.counter,
changes.clone().into_inner(),
changes.clone().into_inner().to_vec(),
)));
}

Expand Down
32 changes: 17 additions & 15 deletions timely/src/progress/change_batch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! A collection of updates of the form `(T, i64)`.
use smallvec::SmallVec;

/// A collection of updates of the form `(T, i64)`.
///
/// A `ChangeBatch` accumulates updates of the form `(T, i64)`, where it is capable of consolidating
Expand All @@ -10,14 +12,14 @@
/// that they may provoke a compaction. I've tried to prevent exposing methods that allow surprisingly
/// expensive operations; all operations should take an amortized constant or logarithmic time.
#[derive(Clone, Debug, Eq, PartialEq, Abomonation, Serialize, Deserialize)]
pub struct ChangeBatch<T> {
pub struct ChangeBatch<T, const X: usize = 2> {
// A list of updates to which we append.
updates: Vec<(T, i64)>,
updates: SmallVec<[(T, i64); X]>,
// The length of the prefix of `self.updates` known to be compact.
clean: usize,
}

impl<T> ChangeBatch<T> {
impl<T, const X: usize> ChangeBatch<T, X> {

/// Allocates a new empty `ChangeBatch`.
///
Expand All @@ -29,9 +31,9 @@ impl<T> ChangeBatch<T> {
/// let mut batch = ChangeBatch::<usize>::new();
/// assert!(batch.is_empty());
///```
pub fn new() -> ChangeBatch<T> {
pub fn new() -> Self {
ChangeBatch {
updates: Vec::new(),
updates: SmallVec::new(),
clean: 0,
}
}
Expand All @@ -46,9 +48,9 @@ impl<T> ChangeBatch<T> {
/// let mut batch = ChangeBatch::<usize>::with_capacity(10);
/// assert!(batch.is_empty());
///```
pub fn with_capacity(capacity: usize) -> ChangeBatch<T> {
pub fn with_capacity(capacity: usize) -> Self {
ChangeBatch {
updates: Vec::with_capacity(capacity),
updates: SmallVec::with_capacity(capacity),
clean: 0,
}
}
Expand All @@ -59,7 +61,7 @@ impl<T> ChangeBatch<T> {
}

/// Expose the internal vector of updates.
pub fn unstable_internal_updates(&self) -> &Vec<(T, i64)> { &self.updates }
pub fn unstable_internal_updates(&self) -> &SmallVec<[(T, i64); X]> { &self.updates }

/// Expose the internal value of `clean`.
pub fn unstable_internal_clean(&self) -> usize { self.clean }
Expand All @@ -82,7 +84,7 @@ impl<T> ChangeBatch<T> {
}
}

impl<T> ChangeBatch<T>
impl<T, const X: usize> ChangeBatch<T, X>
where
T: Ord,
{
Expand All @@ -97,7 +99,7 @@ where
/// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
/// assert!(!batch.is_empty());
///```
pub fn new_from(key: T, val: i64) -> ChangeBatch<T> {
pub fn new_from(key: T, val: i64) -> Self {
let mut result = ChangeBatch::new();
result.update(key, val);
result
Expand Down Expand Up @@ -150,9 +152,9 @@ where
/// use timely::progress::ChangeBatch;
///
/// let batch = ChangeBatch::<usize>::new_from(17, 1);
/// assert_eq!(batch.into_inner(), vec![(17, 1)]);
/// assert_eq!(batch.into_inner().to_vec(), vec![(17, 1)]);
///```
pub fn into_inner(mut self) -> Vec<(T, i64)> {
pub fn into_inner(mut self) -> SmallVec<[(T, i64); X]> {
self.compact();
self.updates
}
Expand Down Expand Up @@ -197,7 +199,7 @@ where
/// assert!(batch.is_empty());
///```
#[inline]
pub fn drain(&mut self) -> ::std::vec::Drain<(T, i64)> {
pub fn drain(&mut self) -> smallvec::Drain<[(T, i64); X]> {
self.compact();
self.clean = 0;
self.updates.drain(..)
Expand Down Expand Up @@ -270,7 +272,7 @@ where
/// assert!(!batch2.is_empty());
///```
#[inline]
pub fn drain_into(&mut self, other: &mut ChangeBatch<T>) where T: Clone {
pub fn drain_into(&mut self, other: &mut ChangeBatch<T, X>) where T: Clone {
if other.updates.is_empty() {
::std::mem::swap(self, other);
}
Expand Down Expand Up @@ -311,7 +313,7 @@ where
}
}

impl<T> Default for ChangeBatch<T> {
impl<T, const X: usize> Default for ChangeBatch<T, X> {
fn default() -> Self {
Self::new()
}
Expand Down
28 changes: 17 additions & 11 deletions timely/src/progress/frontier.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Tracks minimal sets of mutually incomparable elements of a partial order.
use smallvec::SmallVec;

use crate::progress::ChangeBatch;
use crate::order::{PartialOrder, TotalOrder};

Expand All @@ -15,7 +17,7 @@ use crate::order::{PartialOrder, TotalOrder};
/// are identical.
#[derive(Debug, Abomonation, Serialize, Deserialize)]
pub struct Antichain<T> {
elements: Vec<T>
elements: SmallVec<[T; 1]>
}

impl<T: PartialOrder> Antichain<T> {
Expand Down Expand Up @@ -190,7 +192,7 @@ impl<T> Antichain<T> {
///
/// let mut frontier = Antichain::<u32>::new();
///```
pub fn new() -> Antichain<T> { Antichain { elements: Vec::new() } }
pub fn new() -> Antichain<T> { Antichain { elements: SmallVec::new() } }

/// Creates a new empty `Antichain` with space for `capacity` elements.
///
Expand All @@ -203,7 +205,7 @@ impl<T> Antichain<T> {
///```
pub fn with_capacity(capacity: usize) -> Self {
Self {
elements: Vec::with_capacity(capacity),
elements: SmallVec::with_capacity(capacity),
}
}

Expand All @@ -216,7 +218,11 @@ impl<T> Antichain<T> {
///
/// let mut frontier = Antichain::from_elem(2);
///```
pub fn from_elem(element: T) -> Antichain<T> { Antichain { elements: vec![element] } }
pub fn from_elem(element: T) -> Antichain<T> {
let mut elements = SmallVec::with_capacity(1);
elements.push(element);
Antichain { elements }
}

/// Clears the contents of the antichain.
///
Expand Down Expand Up @@ -330,8 +336,8 @@ impl<T: PartialOrder> From<Vec<T>> for Antichain<T> {
}
}

impl<T> Into<Vec<T>> for Antichain<T> {
fn into(self) -> Vec<T> {
impl<T> Into<SmallVec<[T; 1]>> for Antichain<T> {
fn into(self) -> SmallVec<[T; 1]> {
self.elements
}
}
Expand All @@ -345,7 +351,7 @@ impl<T> ::std::ops::Deref for Antichain<T> {

impl<T> ::std::iter::IntoIterator for Antichain<T> {
type Item = T;
type IntoIter = ::std::vec::IntoIter<T>;
type IntoIter = smallvec::IntoIter<[T; 1]>;
fn into_iter(self) -> Self::IntoIter {
self.elements.into_iter()
}
Expand Down Expand Up @@ -520,7 +526,7 @@ impl<T> MutableAntichain<T> {
/// assert!(changes == vec![(1, -1), (2, 1)]);
///```
#[inline]
pub fn update_iter<I>(&mut self, updates: I) -> ::std::vec::Drain<'_, (T, i64)>
pub fn update_iter<I>(&mut self, updates: I) -> smallvec::Drain<'_, [(T, i64); 2]>
where
T: Clone + PartialOrder + Ord,
I: IntoIterator<Item = (T, i64)>,
Expand Down Expand Up @@ -622,11 +628,11 @@ pub trait MutableAntichainFilter<T: PartialOrder+Ord+Clone> {
///
/// assert!(changes == vec![(1, -1), (2, 1)]);
/// ```
fn filter_through(self, antichain: &mut MutableAntichain<T>) -> ::std::vec::Drain<(T,i64)>;
fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<[(T,i64); 2]>;
}

impl<T: PartialOrder+Ord+Clone, I: IntoIterator<Item=(T,i64)>> MutableAntichainFilter<T> for I {
fn filter_through(self, antichain: &mut MutableAntichain<T>) -> ::std::vec::Drain<(T,i64)> {
fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<[(T,i64); 2]> {
antichain.update_iter(self.into_iter())
}
}
Expand Down Expand Up @@ -700,7 +706,7 @@ impl<'a, T: 'a> AntichainRef<'a, T> {
///```
pub fn to_owned(&self) -> Antichain<T> where T: Clone {
Antichain {
elements: self.frontier.to_vec()
elements: self.frontier.into()
}
}
}
Expand Down

0 comments on commit 0da150a

Please sign in to comment.