diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 9d8b68aad..af629539d 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -31,6 +31,7 @@ timely_logging = { path = "../logging", version = "0.12" } timely_communication = { path = "../communication", version = "0.12", default-features = false } timely_container = { path = "../container", version = "0.12" } crossbeam-channel = "0.5.0" +smallvec = { version = "1.13.2", features = ["serde", "const_generics"] } [dev-dependencies] # timely_sort="0.1.6" diff --git a/timely/examples/event_driven.rs b/timely/examples/event_driven.rs index ee3b52515..886d6b35b 100644 --- a/timely/examples/event_driven.rs +++ b/timely/examples/event_driven.rs @@ -34,7 +34,7 @@ fn main() { println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length); - for round in 0 .. { + for round in 0 .. 10 { let dataflow = round % dataflows; if record { inputs[dataflow].send(()); diff --git a/timely/src/dataflow/operators/core/capture/capture.rs b/timely/src/dataflow/operators/core/capture/capture.rs index b14d3a6b5..d7dc58ba7 100644 --- a/timely/src/dataflow/operators/core/capture/capture.rs +++ b/timely/src/dataflow/operators/core/capture/capture.rs @@ -131,7 +131,7 @@ impl Capture for StreamCore { if !progress.frontiers[0].is_empty() { // transmit any frontier progress. let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new()); - event_pusher.push(Event::Progress(to_send.into_inner())); + event_pusher.push(Event::Progress(to_send.into_inner().to_vec())); } use crate::communication::message::RefOrMut; diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 7c0b95dfe..3bb640adc 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -103,7 +103,7 @@ impl Progcaster { self.to_push = Some(Message::from_typed(( self.source, self.counter, - changes.clone().into_inner(), + changes.clone().into_inner().to_vec(), ))); } diff --git a/timely/src/progress/change_batch.rs b/timely/src/progress/change_batch.rs index 6573a45c8..b87ffdac6 100644 --- a/timely/src/progress/change_batch.rs +++ b/timely/src/progress/change_batch.rs @@ -1,5 +1,7 @@ //! A collection of updates of the form `(T, i64)`. +use smallvec::SmallVec; + /// A collection of updates of the form `(T, i64)`. /// /// A `ChangeBatch` accumulates updates of the form `(T, i64)`, where it is capable of consolidating @@ -10,14 +12,14 @@ /// that they may provoke a compaction. I've tried to prevent exposing methods that allow surprisingly /// expensive operations; all operations should take an amortized constant or logarithmic time. #[derive(Clone, Debug, Eq, PartialEq, Abomonation, Serialize, Deserialize)] -pub struct ChangeBatch { +pub struct ChangeBatch { // A list of updates to which we append. - updates: Vec<(T, i64)>, + updates: SmallVec<[(T, i64); X]>, // The length of the prefix of `self.updates` known to be compact. clean: usize, } -impl ChangeBatch { +impl ChangeBatch { /// Allocates a new empty `ChangeBatch`. /// @@ -29,9 +31,9 @@ impl ChangeBatch { /// let mut batch = ChangeBatch::::new(); /// assert!(batch.is_empty()); ///``` - pub fn new() -> ChangeBatch { + pub fn new() -> Self { ChangeBatch { - updates: Vec::new(), + updates: SmallVec::new(), clean: 0, } } @@ -46,9 +48,9 @@ impl ChangeBatch { /// let mut batch = ChangeBatch::::with_capacity(10); /// assert!(batch.is_empty()); ///``` - pub fn with_capacity(capacity: usize) -> ChangeBatch { + pub fn with_capacity(capacity: usize) -> Self { ChangeBatch { - updates: Vec::with_capacity(capacity), + updates: SmallVec::with_capacity(capacity), clean: 0, } } @@ -59,7 +61,7 @@ impl ChangeBatch { } /// Expose the internal vector of updates. - pub fn unstable_internal_updates(&self) -> &Vec<(T, i64)> { &self.updates } + pub fn unstable_internal_updates(&self) -> &SmallVec<[(T, i64); X]> { &self.updates } /// Expose the internal value of `clean`. pub fn unstable_internal_clean(&self) -> usize { self.clean } @@ -82,7 +84,7 @@ impl ChangeBatch { } } -impl ChangeBatch +impl ChangeBatch where T: Ord, { @@ -97,7 +99,7 @@ where /// let mut batch = ChangeBatch::::new_from(17, 1); /// assert!(!batch.is_empty()); ///``` - pub fn new_from(key: T, val: i64) -> ChangeBatch { + pub fn new_from(key: T, val: i64) -> Self { let mut result = ChangeBatch::new(); result.update(key, val); result @@ -150,9 +152,9 @@ where /// use timely::progress::ChangeBatch; /// /// let batch = ChangeBatch::::new_from(17, 1); - /// assert_eq!(batch.into_inner(), vec![(17, 1)]); + /// assert_eq!(batch.into_inner().to_vec(), vec![(17, 1)]); ///``` - pub fn into_inner(mut self) -> Vec<(T, i64)> { + pub fn into_inner(mut self) -> SmallVec<[(T, i64); X]> { self.compact(); self.updates } @@ -197,7 +199,7 @@ where /// assert!(batch.is_empty()); ///``` #[inline] - pub fn drain(&mut self) -> ::std::vec::Drain<(T, i64)> { + pub fn drain(&mut self) -> smallvec::Drain<[(T, i64); X]> { self.compact(); self.clean = 0; self.updates.drain(..) @@ -270,7 +272,7 @@ where /// assert!(!batch2.is_empty()); ///``` #[inline] - pub fn drain_into(&mut self, other: &mut ChangeBatch) where T: Clone { + pub fn drain_into(&mut self, other: &mut ChangeBatch) where T: Clone { if other.updates.is_empty() { ::std::mem::swap(self, other); } @@ -311,7 +313,7 @@ where } } -impl Default for ChangeBatch { +impl Default for ChangeBatch { fn default() -> Self { Self::new() } diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index 202c6e983..d49192583 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -1,5 +1,7 @@ //! Tracks minimal sets of mutually incomparable elements of a partial order. +use smallvec::SmallVec; + use crate::progress::ChangeBatch; use crate::order::{PartialOrder, TotalOrder}; @@ -15,7 +17,7 @@ use crate::order::{PartialOrder, TotalOrder}; /// are identical. #[derive(Debug, Abomonation, Serialize, Deserialize)] pub struct Antichain { - elements: Vec + elements: SmallVec<[T; 1]> } impl Antichain { @@ -190,7 +192,7 @@ impl Antichain { /// /// let mut frontier = Antichain::::new(); ///``` - pub fn new() -> Antichain { Antichain { elements: Vec::new() } } + pub fn new() -> Antichain { Antichain { elements: SmallVec::new() } } /// Creates a new empty `Antichain` with space for `capacity` elements. /// @@ -203,7 +205,7 @@ impl Antichain { ///``` pub fn with_capacity(capacity: usize) -> Self { Self { - elements: Vec::with_capacity(capacity), + elements: SmallVec::with_capacity(capacity), } } @@ -216,7 +218,11 @@ impl Antichain { /// /// let mut frontier = Antichain::from_elem(2); ///``` - pub fn from_elem(element: T) -> Antichain { Antichain { elements: vec![element] } } + pub fn from_elem(element: T) -> Antichain { + let mut elements = SmallVec::with_capacity(1); + elements.push(element); + Antichain { elements } + } /// Clears the contents of the antichain. /// @@ -330,8 +336,8 @@ impl From> for Antichain { } } -impl Into> for Antichain { - fn into(self) -> Vec { +impl Into> for Antichain { + fn into(self) -> SmallVec<[T; 1]> { self.elements } } @@ -345,7 +351,7 @@ impl ::std::ops::Deref for Antichain { impl ::std::iter::IntoIterator for Antichain { type Item = T; - type IntoIter = ::std::vec::IntoIter; + type IntoIter = smallvec::IntoIter<[T; 1]>; fn into_iter(self) -> Self::IntoIter { self.elements.into_iter() } @@ -520,7 +526,7 @@ impl MutableAntichain { /// assert!(changes == vec![(1, -1), (2, 1)]); ///``` #[inline] - pub fn update_iter(&mut self, updates: I) -> ::std::vec::Drain<'_, (T, i64)> + pub fn update_iter(&mut self, updates: I) -> smallvec::Drain<'_, [(T, i64); 2]> where T: Clone + PartialOrder + Ord, I: IntoIterator, @@ -622,11 +628,11 @@ pub trait MutableAntichainFilter { /// /// assert!(changes == vec![(1, -1), (2, 1)]); /// ``` - fn filter_through(self, antichain: &mut MutableAntichain) -> ::std::vec::Drain<(T,i64)>; + fn filter_through(self, antichain: &mut MutableAntichain) -> smallvec::Drain<[(T,i64); 2]>; } impl> MutableAntichainFilter for I { - fn filter_through(self, antichain: &mut MutableAntichain) -> ::std::vec::Drain<(T,i64)> { + fn filter_through(self, antichain: &mut MutableAntichain) -> smallvec::Drain<[(T,i64); 2]> { antichain.update_iter(self.into_iter()) } } @@ -700,7 +706,7 @@ impl<'a, T: 'a> AntichainRef<'a, T> { ///``` pub fn to_owned(&self) -> Antichain where T: Clone { Antichain { - elements: self.frontier.to_vec() + elements: self.frontier.into() } } }