From 85b126c61a2b41e61268e09e39e5bb7dfce3cfc0 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 23 May 2024 16:17:04 -0400 Subject: [PATCH] Pass data from batcher to builder by chunk (#491) * Pass data from batcher to builder by chain Currently, the data shared between the batcher and the builder are individual tuples, either moved or by reference. This limits flexibility around what kind of data can be provided to a builder, i.e., it has to be in the form of tuples, either owned or a reference to a fully-formed one. This works fine for vector-like structures, but will not work for containers that like to arrange their data differently. This change alters the contract between the batcher and the builder to provide chunks instead of individual items (it does not require _chains_.) The data in the chunks must be sorted, and subsequent calls must maintain order, too. The input containers need to implement `BuilderInput`, a type that describes how a container's items can be broken into key, value, time, and diff, where key and value can be references or owned data, as long as they can be pushed into the underlying key and value containers. The change has some quirks around comparing keys to keys already in the builder. The types can differ, and the best solution I could come up with was to add two explicit comparison functions to `BuilderInput` to compare keys and values. While it is not elegant, it allows us to move forward with this change, without adding nightmare-inducing trait bounds all-over. Signed-off-by: Moritz Hoffmann * Address feedback Signed-off-by: Moritz Hoffmann --------- Signed-off-by: Moritz Hoffmann --- src/operators/arrange/arrangement.rs | 8 +- src/operators/arrange/upsert.rs | 6 +- src/operators/reduce.rs | 23 ++- src/trace/implementations/merge_batcher.rs | 6 +- .../implementations/merge_batcher_col.rs | 9 +- src/trace/implementations/mod.rs | 169 +++++++++++++++++- src/trace/implementations/ord_neu.rs | 161 +++++++---------- src/trace/implementations/rhh.rs | 89 ++++----- src/trace/mod.rs | 17 +- 9 files changed, 298 insertions(+), 190 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 13bbb915b..d8fb9720d 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -75,6 +75,8 @@ where use ::timely::dataflow::scopes::Child; use ::timely::progress::timestamp::Refines; +use timely::Container; +use timely::container::PushInto; impl Arranged where @@ -292,7 +294,8 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| { @@ -311,7 +314,8 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 758ec8df3..65cdc0b4b 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -138,7 +138,7 @@ where F: Fn(Tr::Val<'_>) -> V + 'static, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder>, { let mut reader: Option> = None; @@ -282,9 +282,7 @@ where } // Must insert updates in (key, val, time) order. updates.sort(); - for update in updates.drain(..) { - builder.push(update); - } + builder.push(&mut updates); } let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); prev_frontier.clone_from(&upper); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index dda549bca..8711e66a0 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -5,6 +5,8 @@ //! to the key and the list of values. //! The function is expected to populate a list of output values. +use timely::Container; +use timely::container::PushInto; use crate::hashable::Hashable; use crate::{Data, ExchangeData, Collection}; use crate::difference::{Semigroup, Abelian}; @@ -252,7 +254,7 @@ pub trait ReduceCore where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| { @@ -274,7 +276,7 @@ pub trait ReduceCore where T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, ; } @@ -293,7 +295,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -312,7 +314,8 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -454,6 +457,8 @@ where builders.push(T2::Builder::new()); } + let mut buffer = Default::default(); + // cursors for navigating input and output traces. let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); let source_storage = &source_storage; @@ -531,7 +536,9 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - builders[index].push(((key.into_owned(), val), time, diff)); + ((key.into_owned(), val), time, diff).push_into(&mut buffer); + builders[index].push(&mut buffer); + buffer.clear(); } } } @@ -648,7 +655,7 @@ where where F: Fn(C2::Val<'_>) -> V, L: FnMut( - C1::Key<'a>, + C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], &mut Vec<(V, C2::Diff)>, &mut Vec<(V, C2::Diff)>, @@ -728,7 +735,7 @@ mod history_replay { where F: Fn(C2::Val<'_>) -> V, L: FnMut( - C1::Key<'a>, + C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], &mut Vec<(V, C2::Diff)>, &mut Vec<(V, C2::Diff)>, @@ -1020,7 +1027,7 @@ mod history_replay { new_interesting.push(next_time.clone()); debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time))) } - + // Update `meet` to track the meet of each source of times. meet = None;//T::maximum(); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 45e2a60f8..bb13cf650 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -282,7 +282,7 @@ where type Time = T; type Input = Vec<((K, V), T, R)>; type Chunk = Vec<((K, V), T, R)>; - type Output = ((K, V), T, R); + type Output = Vec<((K, V), T, R)>; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity @@ -497,8 +497,8 @@ where } let mut builder = B::with_capacity(keys, vals, upds); - for datum in chain.drain(..).flatten() { - builder.push(datum); + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); } builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index aed0039d8..265f2e649 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -67,7 +67,7 @@ where type Time = T; type Input = Vec<((K, V), T, R)>; type Chunk = TimelyStack<((K, V), T, R)>; - type Output = ((K, V), T, R); + type Output = TimelyStack<((K, V), T, R)>; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity @@ -290,11 +290,8 @@ where } } let mut builder = B::with_capacity(keys, vals, upds); - - for chunk in chain.drain(..) { - for datum in chunk.iter() { - builder.copy(datum); - } + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); } builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 128ec0bf1..d0e4a459a 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -54,7 +54,10 @@ pub use self::ord_neu::OrdKeySpine as KeySpine; use std::borrow::{ToOwned}; use std::cmp::Ordering; +use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; +use timely::container::PushInto; +use timely::progress::Timestamp; use crate::lattice::Lattice; use crate::difference::Semigroup; @@ -138,7 +141,7 @@ where /// A type with a preferred container. /// -/// Examples include types that implement `Clone` who prefer +/// Examples include types that implement `Clone` who prefer pub trait PreferredContainer : ToOwned { /// The preferred container for the type. type Container: BatchContainer; @@ -161,8 +164,8 @@ impl Update for Preferred where K: ToOwned + ?Sized, K::Owned: Ord+Clone+'static, - V: ToOwned + ?Sized + 'static, - V::Owned: Ord+Clone, + V: ToOwned + ?Sized, + V::Owned: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, R: Semigroup+Clone, { @@ -177,8 +180,8 @@ where K: Ord+ToOwned+PreferredContainer + ?Sized, K::Owned: Ord+Clone+'static, // for<'a> K::Container: BatchContainer = &'a K>, - V: Ord+ToOwned+PreferredContainer + ?Sized + 'static, - V::Owned: Ord+Clone, + V: Ord+ToOwned+PreferredContainer + ?Sized, + V::Owned: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, D: Semigroup+Clone, { @@ -195,7 +198,7 @@ use abomonation_derive::Abomonation; use crate::trace::cursor::MyTrait; /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. -#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)] +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Abomonation)] pub struct OffsetList { /// Length of a prefix of zero elements. pub zero_prefix: usize, @@ -205,6 +208,12 @@ pub struct OffsetList { pub chonk: Vec, } +impl std::fmt::Debug for OffsetList { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_list().entries(self.into_iter()).finish() + } +} + impl OffsetList { /// Allocate a new list with a specified capacity. pub fn with_capacity(cap: usize) -> Self { @@ -222,7 +231,7 @@ impl OffsetList { else if self.chonk.is_empty() { if let Ok(smol) = offset.try_into() { self.smol.push(smol); - } + } else { self.chonk.push(offset.try_into().unwrap()) } @@ -249,6 +258,41 @@ impl OffsetList { } } +impl<'a> IntoIterator for &'a OffsetList { + type Item = usize; + type IntoIter = OffsetListIter<'a>; + + fn into_iter(self) -> Self::IntoIter { + OffsetListIter {list: self, index: 0 } + } +} + +/// An iterator for [`OffsetList`]. +pub struct OffsetListIter<'a> { + list: &'a OffsetList, + index: usize, +} + +impl<'a> Iterator for OffsetListIter<'a> { + type Item = usize; + + fn next(&mut self) -> Option { + if self.index < self.list.len() { + let res = Some(self.list.index(self.index)); + self.index += 1; + res + } else { + None + } + } +} + +impl PushInto for usize { + fn push_into(self, target: &mut OffsetList) { + target.push(self); + } +} + /// Helper struct to provide `MyTrait` for `Copy` types. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] pub struct Wrapper(T); @@ -320,12 +364,111 @@ impl BatchContainer for OffsetList { } } +/// Behavior to split an update into principal components. +pub trait BuilderInput: Container { + /// Key portion + type Key<'a>: Ord; + /// Value portion + type Val<'a>: Ord; + /// Time + type Time; + /// Diff + type Diff; + + /// Split an item into separate parts. + fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff); + + /// Test that the key equals a key in the layout's key container. + fn key_eq(this: &Self::Key<'_>, other: ::ReadItem<'_>) -> bool; + + /// Test that the value equals a key in the layout's value container. + fn val_eq(this: &Self::Val<'_>, other: ::ReadItem<'_>) -> bool; +} + +impl BuilderInput> for Vec<((K, V), T, R)> +where + K: Ord + Clone + 'static, + V: Ord + Clone + 'static, + T: Timestamp + Lattice + Clone + 'static, + R: Semigroup + Clone + 'static, +{ + type Key<'a> = K; + type Val<'a> = V; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time, diff) + } + + fn key_eq(this: &K, other: &K) -> bool { + this == other + } + + fn val_eq(this: &V, other: &V) -> bool { + this == other + } +} + +impl BuilderInput> for TimelyStack<((K, V), T, R)> +where + K: Ord + Columnation + Clone + 'static, + V: Ord + Columnation + Clone + 'static, + T: Timestamp + Lattice + Columnation + Clone + 'static, + R: Semigroup + Columnation + Clone + 'static, +{ + type Key<'a> = &'a K; + type Val<'a> = &'a V; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K, other: &K) -> bool { + *this == other + } + + fn val_eq(this: &&V, other: &V) -> bool { + *this == other + } +} + +impl BuilderInput> for TimelyStack<((::Owned, ::Owned), T, R)> +where + K: Ord+ToOwned+PreferredContainer + ?Sized, + K::Owned: Columnation + Ord+Clone+'static, + V: Ord+ToOwned+PreferredContainer + ?Sized, + V::Owned: Columnation + Ord+Clone+'static, + T: Columnation + Ord+Lattice+Timestamp+Clone, + R: Columnation + Semigroup+Clone, +{ + type Key<'a> = &'a K::Owned; + type Val<'a> = &'a V::Owned; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { + other.equals(this) + } + + fn val_eq(this: &&V::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { + other.equals(this) + } +} + pub use self::containers::{BatchContainer, SliceContainer}; /// Containers for data that resemble `Vec`, with leaner implementations. pub mod containers { use timely::container::columnation::{Columnation, TimelyStack}; + use timely::container::PushInto; use std::borrow::ToOwned; use crate::trace::MyTrait; @@ -498,6 +641,18 @@ pub mod containers { inner: Vec, } + impl PushInto> for &[B] { + fn push_into(self, target: &mut SliceContainer) { + target.copy(self) + } + } + + impl PushInto> for &Vec { + fn push_into(self, target: &mut SliceContainer) { + target.copy(self) + } + } + impl BatchContainer for SliceContainer where B: Ord + Clone + Sized + 'static, diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index a5afee109..ddc8a4409 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -9,6 +9,7 @@ //! and should consume fewer resources (computation and memory) when it applies. use std::rc::Rc; +use timely::container::columnation::{TimelyStack}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; @@ -24,7 +25,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; pub type OrdValSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,V),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -33,14 +34,14 @@ pub type OrdValSpine = Spine< pub type ColValSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,V),T,R)>>>, >; /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,()),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -49,28 +50,30 @@ pub type OrdKeySpine = Spine< pub type ColKeySpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,()),T,R)>>>, >; /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>, MergeBatcher::Owned,::Owned),T,R)>,T>, - RcBuilder>>, + RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>, >; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; + mod val_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update}; @@ -148,7 +151,7 @@ mod val_batch { OrdValCursor { key_cursor: 0, val_cursor: 0, - phantom: std::marker::PhantomData, + phantom: PhantomData, } } fn len(&self) -> usize { @@ -189,7 +192,7 @@ mod val_batch { impl Merger> for OrdValMerger where - OrdValBatch: Batch::Time> + OrdValBatch: Batch::Time>, { fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -498,7 +501,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdValBuilder { + pub struct OrdValBuilder { result: OrdValStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -506,9 +509,10 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl OrdValBuilder { + impl OrdValBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -536,9 +540,15 @@ mod val_batch { } } - impl Builder for OrdValBuilder { + impl Builder for OrdValBuilder + where + L: Layout, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, + for<'a> CI::Key<'a>: PushInto, + for<'a> CI::Val<'a>: PushInto, + { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = OrdValBatch; @@ -554,62 +564,35 @@ mod val_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + val.push_into(&mut self.result.vals); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - self.result.keys.push(key); - } - } - - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); + val.push_into(&mut self.result.vals); + key.push_into(&mut self.result.keys); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - self.result.keys.copy_push(key); } } @@ -634,10 +617,11 @@ mod key_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update}; @@ -962,7 +946,7 @@ mod key_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdKeyBuilder { + pub struct OrdKeyBuilder { result: OrdKeyStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -970,9 +954,10 @@ mod key_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl OrdKeyBuilder { + impl OrdKeyBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -1000,9 +985,14 @@ mod key_batch { } } - impl Builder for OrdKeyBuilder { + impl Builder for OrdKeyBuilder + where + L: Layout, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, + for<'a> CI::Key<'a>: PushInto, + { - type Input = ((::Key, ()), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = OrdKeyBatch; @@ -1016,38 +1006,25 @@ mod key_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, ()), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.keys.push(key); - } - } - - #[inline] - fn copy(&mut self, ((key, ()), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - self.push_update(time.clone(), diff.clone()); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.keys.copy_push(key); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, _val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + key.push_into(&mut self.result.keys); + } } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 8389b62a6..60ed6afd4 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -9,6 +9,7 @@ use std::rc::Rc; use std::cmp::Ordering; use abomonation_derive::Abomonation; +use timely::container::columnation::TimelyStack; use crate::Hashable; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; @@ -24,7 +25,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; pub type VecSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,V),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -33,7 +34,7 @@ pub type VecSpine = Spine< pub type ColSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,V),T,R)>>>, >; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; @@ -86,12 +87,13 @@ mod val_batch { use std::convert::TryInto; use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::hashable::Hashable; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update, HashOrdered}; @@ -695,7 +697,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct RhhValBuilder + pub struct RhhValBuilder where ::Key: Default + HashOrdered, { @@ -706,9 +708,10 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl RhhValBuilder + impl RhhValBuilder where ::Key: Default + HashOrdered, { @@ -739,12 +742,14 @@ mod val_batch { } } - impl Builder for RhhValBuilder + impl Builder for RhhValBuilder where ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, + CI: for<'a> BuilderInput = ::Key, Time=::Time, Diff=::Diff>, + for<'a> CI::Val<'a>: PushInto, { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = RhhValBatch; @@ -772,64 +777,36 @@ mod val_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + val.push_into(&mut self.result.vals); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(val); + val.push_into(&mut self.result.vals); + // Insert the key, but with no specified offset. + self.result.insert_key(key.borrow(), None); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key.borrow(), None); - } - } - - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key, None); } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index afee7c22e..00b72fc1d 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -336,15 +336,10 @@ pub trait Builder: Sized { /// /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates. fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self; - /// Adds an element to the batch. + /// Adds a chunk of elements to the batch. /// - /// The default implementation uses `self.copy` with references to the owned arguments. - /// One should override it if the builder can take advantage of owned arguments. - fn push(&mut self, element: Self::Input) { - self.copy(&element); - } - /// Adds an element to the batch. - fn copy(&mut self, element: &Self::Input); + /// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state. + fn push(&mut self, chunk: &mut Self::Input); /// Completes building and returns the batch. fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output; } @@ -454,8 +449,7 @@ pub mod rc_blanket_impls { type Time = B::Time; type Output = Rc; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } } @@ -561,8 +555,7 @@ pub mod abomonated_blanket_impls { type Time = B::Time; type Output = Abomonated>; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output { let batch = self.builder.done(lower, upper, since); let mut bytes = Vec::with_capacity(measure(&batch));