Skip to content

Commit

Permalink
Document and organize MergeBatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Dec 18, 2024
1 parent db1e869 commit a30b84a
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 31 deletions.
78 changes: 50 additions & 28 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
//! A general purpose `Batcher` implementation based on radix sort.
//! A `Batcher` implementation based on merge sort.
//!
//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger".
//! The chunker receives input batches and consolidates them, producing sorted output
//! "chunks" that are fully consolidated (no adjacent updates can be accumulated).
//! The merger implements the [`Merger`] trait, and provides hooks for manipulating
//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also
//! splitting them apart based on time.
//!
//! Implementations of `MergeBatcher` can be instantiated through the choice of both
//! the chunker and the merger, provided their respective output and input types align.
use std::collections::VecDeque;
use std::marker::PhantomData;
Expand All @@ -14,30 +24,30 @@ use crate::logging::{BatcherEvent, DifferentialEvent};
use crate::trace::{Batcher, Builder, Description};
use crate::Data;

/// Creates batches from unordered tuples.
pub struct MergeBatcher<Input, C, M>
where
C: ContainerBuilder<Container=M::Chunk>,
M: Merger,
{
/// each power-of-two length list of allocations.
/// Do not push/pop directly but use the corresponding functions
/// ([`Self::chain_push`]/[`Self::chain_pop`]).
/// Creates batches from containers of unordered tuples.
///
/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
/// and must produce outputs of type `M::Chunk`.
pub struct MergeBatcher<Input, C, M: Merger> {
/// Transforms input streams to chunks of sorted, consolidated data.
chunker: C,
/// A sequence of power-of-two length lists of sorted, consolidated containers.
///
/// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
chains: Vec<Vec<M::Chunk>>,
/// Stash of empty chunks
/// Stash of empty chunks, recycled through the merging process.
stash: Vec<M::Chunk>,
/// Chunker to transform input streams to chunks of data.
chunker: C,
/// Thing to accept data, merge chains, and talk to the builder.
/// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
merger: M,
/// Logger for size accounting.
logger: Option<Logger<DifferentialEvent>>,
/// Timely operator ID.
operator_id: usize,
/// Current lower frontier, we sealed up to here.
lower: Antichain<M::Time>,
/// The lower-bound frontier of the data, after the last call to seal.
frontier: Antichain<M::Time>,
/// Logger for size accounting.
logger: Option<Logger<DifferentialEvent>>,
/// Timely operator ID.
operator_id: usize,
/// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
_marker: PhantomData<Input>,
}

Expand Down Expand Up @@ -123,7 +133,6 @@ where

impl<Input, C, M> MergeBatcher<Input, C, M>
where
C: ContainerBuilder<Container=M::Chunk> + Default,
M: Merger,
{
/// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
Expand Down Expand Up @@ -191,7 +200,6 @@ where

impl<Input, C, M> Drop for MergeBatcher<Input, C, M>
where
C: ContainerBuilder<Container=M::Chunk> + Default,
M: Merger,
{
fn drop(&mut self) {
Expand Down Expand Up @@ -225,7 +233,17 @@ pub trait Merger: Default {

pub mod container {

//! A general purpose `Batcher` implementation for arbitrary containers.
//! A general purpose `Merger` implementation for arbitrary containers.
//!
//! The implementation requires implementations of two traits, `ContainerQueue` and `MergerChunk`.
//! The `ContainerQueue` trait is meant to wrap a container and provide iterable access to it, as
//! well as the ability to return the container when iteration is complete.
//! The `MergerChukn` trait is meant to be implemented by containers, and it explains how container
//! items should be interpreted with respect to times, and with respect to differences.
//! These two traits exist instead of a stack of constraints on the structure of the associated items
//! of the containers, allowing them to perform their functions without destructuring their guts.
//!
//! Standard implementations exist in the `vec`, `columnation`, and `flat_container` modules.
use std::cmp::Ordering;
use std::marker::PhantomData;
Expand All @@ -249,20 +267,26 @@ pub mod container {

/// Behavior to dissect items of chunks in the merge batcher
pub trait MergerChunk : SizableContainer {
/// The owned time type.
/// An owned time type.
///
/// This type is provided so that users can maintain antichains of something, in order to track
/// the forward movement of time and extract intervals from chains of updates.
type TimeOwned;
/// The owned diff type.
///
/// This type is provided so that users can provide an owned instance to the `push_and_add` method,
/// to act as a scratch space when the type is substantial and could otherwise require allocations.
type DiffOwned: Default;

/// Compares a borrowed time to an antichain of owned times.
/// Relates a borrowed time to antichains of owned times.
///
/// If `upper` is less or equal to `time`, the method returns true and ensures that `frontier` reflects `time`.
/// If `upper` is less or equal to `time`, the method returns `true` and ensures that `frontier` reflects `time`.
fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool;

/// Push an entry that adds together two diffs.
///
/// This is only called when two items are deemed mergeable by the container queue.
/// If the two diffs would cancel, do not push anything.
/// If the two diffs added together is zero do not push anything.
fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned);

/// Account the allocations behind the chunk.
Expand All @@ -276,6 +300,7 @@ pub mod container {
/// A merger for arbitrary containers.
///
/// `MC` is a [`Container`] that implements [`MergerChunk`].
/// `CQ` is a [`ContainerQueue`] supporting `MC`.
pub struct ContainerMerger<MC, CQ> {
_marker: PhantomData<(MC, CQ)>,
}
Expand All @@ -287,7 +312,6 @@ pub mod container {
}

impl<MC: MergerChunk, CQ> ContainerMerger<MC, CQ> {

/// Helper to get pre-sized vector from the stash.
#[inline]
fn empty(&self, stash: &mut Vec<MC>) -> MC {
Expand All @@ -297,14 +321,12 @@ pub mod container {
container
})
}

/// Helper to return a chunk to the stash.
#[inline]
fn recycle(&self, chunk: MC, stash: &mut Vec<MC>) {
// TODO: Should we only retain correctly sized containers?
stash.push(chunk);
}

}

impl<MC, CQ> Merger for ContainerMerger<MC, CQ>
Expand Down
3 changes: 0 additions & 3 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@ use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegi
use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker};
use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::implementations::merge_batcher::{MergeBatcher};
use crate::trace::implementations::merge_batcher::container::ContainerMerger;
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger;
use crate::trace::implementations::merge_batcher::VecMerger;
use crate::trace::rc_blanket_impls::RcBuilder;

use crate::trace::implementations::merge_batcher::container::columnation::TimelyStackQueue;

use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};

pub use self::val_batch::{OrdValBatch, OrdValBuilder};
Expand Down

0 comments on commit a30b84a

Please sign in to comment.