Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work towards Batcher unification #553

Merged
Prev Previous commit
Next Next commit
Make generic versions canonical
  • Loading branch information
frankmcsherry committed Dec 18, 2024
commit 291c129425b7dba75d947380a30b1b3a69063c2d
179 changes: 5 additions & 174 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,16 @@
//! Implementations of `MergeBatcher` can be instantiated through the choice of both
//! the chunker and the merger, provided their respective output and input types align.

use std::collections::VecDeque;
use std::marker::PhantomData;

use timely::logging_core::Logger;
use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::{Container, PartialOrder};
use timely::Container;
use timely::container::{ContainerBuilder, PushInto};

use crate::difference::Semigroup;
use crate::logging::{BatcherEvent, DifferentialEvent};
use crate::trace::{Batcher, Builder, Description};
use crate::Data;

/// Creates batches from containers of unordered tuples.
///
Expand Down Expand Up @@ -231,6 +228,8 @@ pub trait Merger: Default {
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
}

pub use container::{VecMerger, ColMerger};

pub mod container {

//! A general purpose `Merger` implementation for arbitrary containers.
Expand Down Expand Up @@ -472,7 +471,7 @@ pub mod container {
use super::{ContainerQueue, MergerChunk};

/// A `Merger` implementation backed by vector containers.
pub type VecMerger<K, V, T, R> = super::ContainerMerger<Vec<((K, V), T, R)>, std::collections::VecDeque<((K, V), T, R)>>;
pub type VecMerger<D, T, R> = super::ContainerMerger<Vec<(D, T, R)>, std::collections::VecDeque<(D, T, R)>>;

impl<D: Ord, T: Ord, R> ContainerQueue<Vec<(D, T, R)>> for VecDeque<(D, T, R)> {
fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> {
Expand Down Expand Up @@ -533,7 +532,7 @@ pub mod container {
use super::{ContainerQueue, MergerChunk};

/// A `Merger` implementation backed by `TimelyStack` containers (columnation).
pub type ColMerger<K, V, T, R> = super::ContainerMerger<TimelyStack<((K,V),T,R)>,TimelyStackQueue<((K,V), T, R)>>;
pub type ColMerger<D, T, R> = super::ContainerMerger<TimelyStack<(D,T,R)>,TimelyStackQueue<(D, T, R)>>;

/// TODO
pub struct TimelyStackQueue<T: Columnation> {
Expand Down Expand Up @@ -692,171 +691,3 @@ pub mod container {
}
}
}

/// A merger that knows how to accept and maintain chains of vectors.
pub struct VecMerger<T> {
_marker: PhantomData<T>,
}

impl<T> Default for VecMerger<T> {
fn default() -> Self {
Self { _marker: PhantomData }
}
}

impl<T> VecMerger<T> {
const BUFFER_SIZE_BYTES: usize = 8 << 10;
fn chunk_capacity(&self) -> usize {
let size = ::std::mem::size_of::<T>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
} else if size <= Self::BUFFER_SIZE_BYTES {
Self::BUFFER_SIZE_BYTES / size
} else {
1
}
}

/// Helper to get pre-sized vector from the stash.
#[inline]
fn empty(&self, stash: &mut Vec<Vec<T>>) -> Vec<T> {
stash.pop().unwrap_or_else(|| Vec::with_capacity(self.chunk_capacity()))
}

/// Helper to return a chunk to the stash.
#[inline]
fn recycle(&self, mut chunk: Vec<T>, stash: &mut Vec<Vec<T>>) {
// TODO: Should we limit the size of `stash`?
if chunk.capacity() == self.chunk_capacity() {
chunk.clear();
stash.push(chunk);
}
}
}

impl<D, T, R> Merger for VecMerger<(D, T, R)>
where
D: Data,
T: Ord + PartialOrder + Clone + 'static,
R: Semigroup + 'static,
{
type Time = T;
type Chunk = Vec<(D, T, R)>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
let mut list2 = list2.into_iter();
let mut head1 = VecDeque::from(list1.next().unwrap_or_default());
let mut head2 = VecDeque::from(list2.next().unwrap_or_default());

let mut result = self.empty(stash);

// while we have valid data in each input, merge.
while !head1.is_empty() && !head2.is_empty() {
while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
let cmp = {
let x = head1.front().unwrap();
let y = head2.front().unwrap();
(&x.0, &x.1).cmp(&(&y.0, &y.1))
};
use std::cmp::Ordering;
match cmp {
Ordering::Less => result.push(head1.pop_front().unwrap()),
Ordering::Greater => result.push(head2.pop_front().unwrap()),
Ordering::Equal => {
let (data1, time1, mut diff1) = head1.pop_front().unwrap();
let (_data2, _time2, diff2) = head2.pop_front().unwrap();
diff1.plus_equals(&diff2);
if !diff1.is_zero() {
result.push((data1, time1, diff1));
}
}
}
}

if result.capacity() == result.len() {
output.push(result);
result = self.empty(stash);
}

if head1.is_empty() {
let done1 = Vec::from(head1);
self.recycle(done1, stash);
head1 = VecDeque::from(list1.next().unwrap_or_default());
}
if head2.is_empty() {
let done2 = Vec::from(head2);
self.recycle(done2, stash);
head2 = VecDeque::from(list2.next().unwrap_or_default());
}
}

if !result.is_empty() {
output.push(result);
} else {
self.recycle(result, stash);
}

if !head1.is_empty() {
let mut result = self.empty(stash);
for item1 in head1 {
result.push(item1);
}
output.push(result);
}
output.extend(list1);

if !head2.is_empty() {
let mut result = self.empty(stash);
for item2 in head2 {
result.push(item2);
}
output.push(result);
}
output.extend(list2);
}

fn extract(
&mut self,
merged: Vec<Self::Chunk>,
upper: AntichainRef<Self::Time>,
frontier: &mut Antichain<Self::Time>,
readied: &mut Vec<Self::Chunk>,
kept: &mut Vec<Self::Chunk>,
stash: &mut Vec<Self::Chunk>,
) {
let mut keep = self.empty(stash);
let mut ready = self.empty(stash);

for mut buffer in merged {
for (data, time, diff) in buffer.drain(..) {
if upper.less_equal(&time) {
frontier.insert_ref(&time);
if keep.len() == keep.capacity() && !keep.is_empty() {
kept.push(keep);
keep = self.empty(stash);
}
keep.push((data, time, diff));
} else {
if ready.len() == ready.capacity() && !ready.is_empty() {
readied.push(ready);
ready = self.empty(stash);
}
ready.push((data, time, diff));
}
}
// Recycling buffer.
self.recycle(buffer, stash);
}
// Finish the kept data.
if !keep.is_empty() {
kept.push(keep);
}
if !ready.is_empty() {
readied.push(ready);
}
}
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
(chunk.len(), 0, 0, 0)
}
}
1 change: 0 additions & 1 deletion src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
pub mod spine_fueled;

pub mod merge_batcher;
pub mod merge_batcher_col;
pub mod merge_batcher_flat;
pub mod ord_neu;
pub mod rhh;
Expand Down
17 changes: 6 additions & 11 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ use timely::container::flatcontainer::{FlatStack, RegionPreference};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker};
use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::implementations::merge_batcher::{MergeBatcher};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger;
use crate::trace::implementations::merge_batcher::VecMerger;
use crate::trace::rc_blanket_impls::RcBuilder;

use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};
Expand All @@ -28,8 +26,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
/// A batcher using ordered lists.
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>;
// pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, ContainerMerger<Vec<((K, V), T, R)>, std::collections::VecDeque<((K, V), T, R)>>>;
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
/// A builder using ordered lists.
pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;

Expand All @@ -39,7 +36,7 @@ pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R
/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
/// A batcher for columnar storage.
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>;
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
/// A builder for columnar storage.
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;

Expand All @@ -64,8 +61,7 @@ pub type FlatValBuilderDefault<K, V, T, R> = FlatValBuilder<FlatLayout<<K as Reg
/// A trace implementation using a spine of ordered lists.
pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
/// A batcher for ordered lists.
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>>;
// pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, ContainerMerger<Vec<((K, ()), T, R)>, std::collections::VecDeque<((K, ()), T, R)>>>;
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
/// A builder for ordered lists.
pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;

Expand All @@ -75,8 +71,7 @@ pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>
/// A trace implementation backed by columnar storage.
pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
/// A batcher for columnar storage
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>>;
// pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ContainerMerger<TimelyStack<((K,()),T,R)>,TimelyStackQueue<((K,()), T, R)>>>;
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
/// A builder for columnar storage
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;

Expand All @@ -99,7 +94,7 @@ pub type FlatKeyBuilderDefault<K, T, R> = FlatKeyBuilder<FlatLayout<<K as Region
/// A trace implementation backed by columnar storage.
pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
/// A batcher for columnar storage.
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationMerger<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>;
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>>;
/// A builder for columnar storage.
pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;

Expand Down
7 changes: 3 additions & 4 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use timely::container::columnation::TimelyStack;

use crate::Hashable;
use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::rc_blanket_impls::RcBuilder;

Expand All @@ -25,7 +24,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder};
/// A trace implementation using a spine of ordered lists.
pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
/// A batcher for ordered lists.
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>;
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
/// A builder for ordered lists.
pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;

Expand All @@ -35,7 +34,7 @@ pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<
/// A trace implementation backed by columnar storage.
pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
/// A batcher for columnar storage.
pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>;
pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
/// A builder for columnar storage.
pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;

Expand Down
Loading