From 291c129425b7dba75d947380a30b1b3a69063c2d Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 8 Dec 2024 11:25:07 -0500 Subject: [PATCH] Make generic versions canonical --- src/trace/implementations/merge_batcher.rs | 179 +-------------------- src/trace/implementations/mod.rs | 1 - src/trace/implementations/ord_neu.rs | 17 +- src/trace/implementations/rhh.rs | 7 +- 4 files changed, 14 insertions(+), 190 deletions(-) diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 5ac3f8851..c5f53eef8 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -10,19 +10,16 @@ //! Implementations of `MergeBatcher` can be instantiated through the choice of both //! the chunker and the merger, provided their respective output and input types align. -use std::collections::VecDeque; use std::marker::PhantomData; use timely::logging_core::Logger; use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; -use timely::{Container, PartialOrder}; +use timely::Container; use timely::container::{ContainerBuilder, PushInto}; -use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder, Description}; -use crate::Data; /// Creates batches from containers of unordered tuples. /// @@ -231,6 +228,8 @@ pub trait Merger: Default { fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize); } +pub use container::{VecMerger, ColMerger}; + pub mod container { //! A general purpose `Merger` implementation for arbitrary containers. @@ -472,7 +471,7 @@ pub mod container { use super::{ContainerQueue, MergerChunk}; /// A `Merger` implementation backed by vector containers. - pub type VecMerger = super::ContainerMerger, std::collections::VecDeque<((K, V), T, R)>>; + pub type VecMerger = super::ContainerMerger, std::collections::VecDeque<(D, T, R)>>; impl ContainerQueue> for VecDeque<(D, T, R)> { fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> { @@ -533,7 +532,7 @@ pub mod container { use super::{ContainerQueue, MergerChunk}; /// A `Merger` implementation backed by `TimelyStack` containers (columnation). - pub type ColMerger = super::ContainerMerger,TimelyStackQueue<((K,V), T, R)>>; + pub type ColMerger = super::ContainerMerger,TimelyStackQueue<(D, T, R)>>; /// TODO pub struct TimelyStackQueue { @@ -692,171 +691,3 @@ pub mod container { } } } - -/// A merger that knows how to accept and maintain chains of vectors. -pub struct VecMerger { - _marker: PhantomData, -} - -impl Default for VecMerger { - fn default() -> Self { - Self { _marker: PhantomData } - } -} - -impl VecMerger { - const BUFFER_SIZE_BYTES: usize = 8 << 10; - fn chunk_capacity(&self) -> usize { - let size = ::std::mem::size_of::(); - if size == 0 { - Self::BUFFER_SIZE_BYTES - } else if size <= Self::BUFFER_SIZE_BYTES { - Self::BUFFER_SIZE_BYTES / size - } else { - 1 - } - } - - /// Helper to get pre-sized vector from the stash. - #[inline] - fn empty(&self, stash: &mut Vec>) -> Vec { - stash.pop().unwrap_or_else(|| Vec::with_capacity(self.chunk_capacity())) - } - - /// Helper to return a chunk to the stash. - #[inline] - fn recycle(&self, mut chunk: Vec, stash: &mut Vec>) { - // TODO: Should we limit the size of `stash`? - if chunk.capacity() == self.chunk_capacity() { - chunk.clear(); - stash.push(chunk); - } - } -} - -impl Merger for VecMerger<(D, T, R)> -where - D: Data, - T: Ord + PartialOrder + Clone + 'static, - R: Semigroup + 'static, -{ - type Time = T; - type Chunk = Vec<(D, T, R)>; - - fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { - let mut list1 = list1.into_iter(); - let mut list2 = list2.into_iter(); - let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); - let mut head2 = VecDeque::from(list2.next().unwrap_or_default()); - - let mut result = self.empty(stash); - - // while we have valid data in each input, merge. - while !head1.is_empty() && !head2.is_empty() { - while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { - let cmp = { - let x = head1.front().unwrap(); - let y = head2.front().unwrap(); - (&x.0, &x.1).cmp(&(&y.0, &y.1)) - }; - use std::cmp::Ordering; - match cmp { - Ordering::Less => result.push(head1.pop_front().unwrap()), - Ordering::Greater => result.push(head2.pop_front().unwrap()), - Ordering::Equal => { - let (data1, time1, mut diff1) = head1.pop_front().unwrap(); - let (_data2, _time2, diff2) = head2.pop_front().unwrap(); - diff1.plus_equals(&diff2); - if !diff1.is_zero() { - result.push((data1, time1, diff1)); - } - } - } - } - - if result.capacity() == result.len() { - output.push(result); - result = self.empty(stash); - } - - if head1.is_empty() { - let done1 = Vec::from(head1); - self.recycle(done1, stash); - head1 = VecDeque::from(list1.next().unwrap_or_default()); - } - if head2.is_empty() { - let done2 = Vec::from(head2); - self.recycle(done2, stash); - head2 = VecDeque::from(list2.next().unwrap_or_default()); - } - } - - if !result.is_empty() { - output.push(result); - } else { - self.recycle(result, stash); - } - - if !head1.is_empty() { - let mut result = self.empty(stash); - for item1 in head1 { - result.push(item1); - } - output.push(result); - } - output.extend(list1); - - if !head2.is_empty() { - let mut result = self.empty(stash); - for item2 in head2 { - result.push(item2); - } - output.push(result); - } - output.extend(list2); - } - - fn extract( - &mut self, - merged: Vec, - upper: AntichainRef, - frontier: &mut Antichain, - readied: &mut Vec, - kept: &mut Vec, - stash: &mut Vec, - ) { - let mut keep = self.empty(stash); - let mut ready = self.empty(stash); - - for mut buffer in merged { - for (data, time, diff) in buffer.drain(..) { - if upper.less_equal(&time) { - frontier.insert_ref(&time); - if keep.len() == keep.capacity() && !keep.is_empty() { - kept.push(keep); - keep = self.empty(stash); - } - keep.push((data, time, diff)); - } else { - if ready.len() == ready.capacity() && !ready.is_empty() { - readied.push(ready); - ready = self.empty(stash); - } - ready.push((data, time, diff)); - } - } - // Recycling buffer. - self.recycle(buffer, stash); - } - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); - } - if !ready.is_empty() { - readied.push(ready); - } - } - fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { - (chunk.len(), 0, 0, 0) - } -} diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 31bd4c628..472293447 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -41,7 +41,6 @@ pub mod spine_fueled; pub mod merge_batcher; -pub mod merge_batcher_col; pub mod merge_batcher_flat; pub mod ord_neu; pub mod rhh; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index cb6c83758..df4e9d68a 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -14,10 +14,8 @@ use timely::container::flatcontainer::{FlatStack, RegionPreference}; use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker}; use crate::trace::implementations::spine_fueled::Spine; -use crate::trace::implementations::merge_batcher::{MergeBatcher}; -use crate::trace::implementations::merge_batcher_col::ColumnationMerger; +use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger; -use crate::trace::implementations::merge_batcher::VecMerger; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout}; @@ -28,8 +26,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine>>>; /// A batcher using ordered lists. -pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>; -// pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, ContainerMerger, std::collections::VecDeque<((K, V), T, R)>>>; +pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; /// A builder using ordered lists. pub type RcOrdValBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -39,7 +36,7 @@ pub type RcOrdValBuilder = RcBuilder = Spine>>>; /// A batcher for columnar storage. -pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>; +pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; /// A builder for columnar storage. pub type ColValBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; @@ -64,8 +61,7 @@ pub type FlatValBuilderDefault = FlatValBuilder = Spine>>>; /// A batcher for ordered lists. -pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>>; -// pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, ContainerMerger, std::collections::VecDeque<((K, ()), T, R)>>>; +pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>; /// A builder for ordered lists. pub type RcOrdKeyBuilder = RcBuilder, Vec<((K,()),T,R)>>>; @@ -75,8 +71,7 @@ pub type RcOrdKeyBuilder = RcBuilder /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine>>>; /// A batcher for columnar storage -pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>>; -// pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ContainerMerger,TimelyStackQueue<((K,()), T, R)>>>; +pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>; /// A builder for columnar storage pub type ColKeyBuilder = RcBuilder, TimelyStack<((K,()),T,R)>>>; @@ -99,7 +94,7 @@ pub type FlatKeyBuilderDefault = FlatKeyBuilder = Spine>>>; /// A batcher for columnar storage. -pub type PreferredBatcher = MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColumnationMerger<((::Owned,::Owned),T,R)>>; +pub type PreferredBatcher = MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColMerger<(::Owned,::Owned),T,R>>; /// A builder for columnar storage. pub type PreferredBuilder = RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>; diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 4c1ac9a11..d2160b0b6 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -13,8 +13,7 @@ use timely::container::columnation::TimelyStack; use crate::Hashable; use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; -use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; -use crate::trace::implementations::merge_batcher_col::ColumnationMerger; +use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::rc_blanket_impls::RcBuilder; @@ -25,7 +24,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. pub type VecSpine = Spine>>>; /// A batcher for ordered lists. -pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>; +pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; /// A builder for ordered lists. pub type VecBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -35,7 +34,7 @@ pub type VecBuilder = RcBuilder, Vec< /// A trace implementation backed by columnar storage. pub type ColSpine = Spine>>>; /// A batcher for columnar storage. -pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>; +pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; /// A builder for columnar storage. pub type ColBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>;