Skip to content

Commit

Permalink
Make generic versions canonical
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Dec 18, 2024
1 parent a30b84a commit 291c129
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 190 deletions.
179 changes: 5 additions & 174 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,16 @@
//! Implementations of `MergeBatcher` can be instantiated through the choice of both
//! the chunker and the merger, provided their respective output and input types align.
use std::collections::VecDeque;
use std::marker::PhantomData;

use timely::logging_core::Logger;
use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::{Container, PartialOrder};
use timely::Container;
use timely::container::{ContainerBuilder, PushInto};

use crate::difference::Semigroup;
use crate::logging::{BatcherEvent, DifferentialEvent};
use crate::trace::{Batcher, Builder, Description};
use crate::Data;

/// Creates batches from containers of unordered tuples.
///
Expand Down Expand Up @@ -231,6 +228,8 @@ pub trait Merger: Default {
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
}

pub use container::{VecMerger, ColMerger};

pub mod container {

//! A general purpose `Merger` implementation for arbitrary containers.
Expand Down Expand Up @@ -472,7 +471,7 @@ pub mod container {
use super::{ContainerQueue, MergerChunk};

/// A `Merger` implementation backed by vector containers.
pub type VecMerger<K, V, T, R> = super::ContainerMerger<Vec<((K, V), T, R)>, std::collections::VecDeque<((K, V), T, R)>>;
pub type VecMerger<D, T, R> = super::ContainerMerger<Vec<(D, T, R)>, std::collections::VecDeque<(D, T, R)>>;

impl<D: Ord, T: Ord, R> ContainerQueue<Vec<(D, T, R)>> for VecDeque<(D, T, R)> {
fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> {
Expand Down Expand Up @@ -533,7 +532,7 @@ pub mod container {
use super::{ContainerQueue, MergerChunk};

/// A `Merger` implementation backed by `TimelyStack` containers (columnation).
pub type ColMerger<K, V, T, R> = super::ContainerMerger<TimelyStack<((K,V),T,R)>,TimelyStackQueue<((K,V), T, R)>>;
pub type ColMerger<D, T, R> = super::ContainerMerger<TimelyStack<(D,T,R)>,TimelyStackQueue<(D, T, R)>>;

/// TODO
pub struct TimelyStackQueue<T: Columnation> {
Expand Down Expand Up @@ -692,171 +691,3 @@ pub mod container {
}
}
}

/// A merger that knows how to accept and maintain chains of vectors.
pub struct VecMerger<T> {
_marker: PhantomData<T>,
}

impl<T> Default for VecMerger<T> {
fn default() -> Self {
Self { _marker: PhantomData }
}
}

impl<T> VecMerger<T> {
const BUFFER_SIZE_BYTES: usize = 8 << 10;
fn chunk_capacity(&self) -> usize {
let size = ::std::mem::size_of::<T>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
} else if size <= Self::BUFFER_SIZE_BYTES {
Self::BUFFER_SIZE_BYTES / size
} else {
1
}
}

/// Helper to get pre-sized vector from the stash.
#[inline]
fn empty(&self, stash: &mut Vec<Vec<T>>) -> Vec<T> {
stash.pop().unwrap_or_else(|| Vec::with_capacity(self.chunk_capacity()))
}

/// Helper to return a chunk to the stash.
#[inline]
fn recycle(&self, mut chunk: Vec<T>, stash: &mut Vec<Vec<T>>) {
// TODO: Should we limit the size of `stash`?
if chunk.capacity() == self.chunk_capacity() {
chunk.clear();
stash.push(chunk);
}
}
}

impl<D, T, R> Merger for VecMerger<(D, T, R)>
where
D: Data,
T: Ord + PartialOrder + Clone + 'static,
R: Semigroup + 'static,
{
type Time = T;
type Chunk = Vec<(D, T, R)>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
let mut list2 = list2.into_iter();
let mut head1 = VecDeque::from(list1.next().unwrap_or_default());
let mut head2 = VecDeque::from(list2.next().unwrap_or_default());

let mut result = self.empty(stash);

// while we have valid data in each input, merge.
while !head1.is_empty() && !head2.is_empty() {
while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
let cmp = {
let x = head1.front().unwrap();
let y = head2.front().unwrap();
(&x.0, &x.1).cmp(&(&y.0, &y.1))
};
use std::cmp::Ordering;
match cmp {
Ordering::Less => result.push(head1.pop_front().unwrap()),
Ordering::Greater => result.push(head2.pop_front().unwrap()),
Ordering::Equal => {
let (data1, time1, mut diff1) = head1.pop_front().unwrap();
let (_data2, _time2, diff2) = head2.pop_front().unwrap();
diff1.plus_equals(&diff2);
if !diff1.is_zero() {
result.push((data1, time1, diff1));
}
}
}
}

if result.capacity() == result.len() {
output.push(result);
result = self.empty(stash);
}

if head1.is_empty() {
let done1 = Vec::from(head1);
self.recycle(done1, stash);
head1 = VecDeque::from(list1.next().unwrap_or_default());
}
if head2.is_empty() {
let done2 = Vec::from(head2);
self.recycle(done2, stash);
head2 = VecDeque::from(list2.next().unwrap_or_default());
}
}

if !result.is_empty() {
output.push(result);
} else {
self.recycle(result, stash);
}

if !head1.is_empty() {
let mut result = self.empty(stash);
for item1 in head1 {
result.push(item1);
}
output.push(result);
}
output.extend(list1);

if !head2.is_empty() {
let mut result = self.empty(stash);
for item2 in head2 {
result.push(item2);
}
output.push(result);
}
output.extend(list2);
}

fn extract(
&mut self,
merged: Vec<Self::Chunk>,
upper: AntichainRef<Self::Time>,
frontier: &mut Antichain<Self::Time>,
readied: &mut Vec<Self::Chunk>,
kept: &mut Vec<Self::Chunk>,
stash: &mut Vec<Self::Chunk>,
) {
let mut keep = self.empty(stash);
let mut ready = self.empty(stash);

for mut buffer in merged {
for (data, time, diff) in buffer.drain(..) {
if upper.less_equal(&time) {
frontier.insert_ref(&time);
if keep.len() == keep.capacity() && !keep.is_empty() {
kept.push(keep);
keep = self.empty(stash);
}
keep.push((data, time, diff));
} else {
if ready.len() == ready.capacity() && !ready.is_empty() {
readied.push(ready);
ready = self.empty(stash);
}
ready.push((data, time, diff));
}
}
// Recycling buffer.
self.recycle(buffer, stash);
}
// Finish the kept data.
if !keep.is_empty() {
kept.push(keep);
}
if !ready.is_empty() {
readied.push(ready);
}
}
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
(chunk.len(), 0, 0, 0)
}
}
1 change: 0 additions & 1 deletion src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
pub mod spine_fueled;

pub mod merge_batcher;
pub mod merge_batcher_col;
pub mod merge_batcher_flat;
pub mod ord_neu;
pub mod rhh;
Expand Down
17 changes: 6 additions & 11 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ use timely::container::flatcontainer::{FlatStack, RegionPreference};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker};
use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::implementations::merge_batcher::{MergeBatcher};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger;
use crate::trace::implementations::merge_batcher::VecMerger;
use crate::trace::rc_blanket_impls::RcBuilder;

use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};
Expand All @@ -28,8 +26,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
/// A batcher using ordered lists.
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>;
// pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, ContainerMerger<Vec<((K, V), T, R)>, std::collections::VecDeque<((K, V), T, R)>>>;
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
/// A builder using ordered lists.
pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;

Expand All @@ -39,7 +36,7 @@ pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R
/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
/// A batcher for columnar storage.
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>;
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
/// A builder for columnar storage.
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;

Expand All @@ -64,8 +61,7 @@ pub type FlatValBuilderDefault<K, V, T, R> = FlatValBuilder<FlatLayout<<K as Reg
/// A trace implementation using a spine of ordered lists.
pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
/// A batcher for ordered lists.
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>>;
// pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, ContainerMerger<Vec<((K, ()), T, R)>, std::collections::VecDeque<((K, ()), T, R)>>>;
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
/// A builder for ordered lists.
pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;

Expand All @@ -75,8 +71,7 @@ pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>
/// A trace implementation backed by columnar storage.
pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
/// A batcher for columnar storage
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>>;
// pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ContainerMerger<TimelyStack<((K,()),T,R)>,TimelyStackQueue<((K,()), T, R)>>>;
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
/// A builder for columnar storage
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;

Expand All @@ -99,7 +94,7 @@ pub type FlatKeyBuilderDefault<K, T, R> = FlatKeyBuilder<FlatLayout<<K as Region
/// A trace implementation backed by columnar storage.
pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
/// A batcher for columnar storage.
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationMerger<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>;
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>>;
/// A builder for columnar storage.
pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;

Expand Down
7 changes: 3 additions & 4 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use timely::container::columnation::TimelyStack;

use crate::Hashable;
use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::rc_blanket_impls::RcBuilder;

Expand All @@ -25,7 +24,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder};
/// A trace implementation using a spine of ordered lists.
pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
/// A batcher for ordered lists.
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>;
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
/// A builder for ordered lists.
pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;

Expand All @@ -35,7 +34,7 @@ pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<
/// A trace implementation backed by columnar storage.
pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
/// A batcher for columnar storage.
pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>;
pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
/// A builder for columnar storage.
pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;

Expand Down

0 comments on commit 291c129

Please sign in to comment.