diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 74dbdc0df..5ac3f8851 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -1,4 +1,14 @@ -//! A general purpose `Batcher` implementation based on radix sort. +//! A `Batcher` implementation based on merge sort. +//! +//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger". +//! The chunker receives input batches and consolidates them, producing sorted output +//! "chunks" that are fully consolidated (no adjacent updates can be accumulated). +//! The merger implements the [`Merger`] trait, and provides hooks for manipulating +//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also +//! splitting them apart based on time. +//! +//! Implementations of `MergeBatcher` can be instantiated through the choice of both +//! the chunker and the merger, provided their respective output and input types align. use std::collections::VecDeque; use std::marker::PhantomData; @@ -14,30 +24,30 @@ use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder, Description}; use crate::Data; -/// Creates batches from unordered tuples. -pub struct MergeBatcher -where - C: ContainerBuilder, - M: Merger, -{ - /// each power-of-two length list of allocations. - /// Do not push/pop directly but use the corresponding functions - /// ([`Self::chain_push`]/[`Self::chain_pop`]). +/// Creates batches from containers of unordered tuples. +/// +/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs, +/// and must produce outputs of type `M::Chunk`. +pub struct MergeBatcher { + /// Transforms input streams to chunks of sorted, consolidated data. + chunker: C, + /// A sequence of power-of-two length lists of sorted, consolidated containers. + /// + /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]). chains: Vec>, - /// Stash of empty chunks + /// Stash of empty chunks, recycled through the merging process. stash: Vec, - /// Chunker to transform input streams to chunks of data. - chunker: C, - /// Thing to accept data, merge chains, and talk to the builder. + /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time. merger: M, - /// Logger for size accounting. - logger: Option>, - /// Timely operator ID. - operator_id: usize, /// Current lower frontier, we sealed up to here. lower: Antichain, /// The lower-bound frontier of the data, after the last call to seal. frontier: Antichain, + /// Logger for size accounting. + logger: Option>, + /// Timely operator ID. + operator_id: usize, + /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present. _marker: PhantomData, } @@ -123,7 +133,6 @@ where impl MergeBatcher where - C: ContainerBuilder + Default, M: Merger, { /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered @@ -191,7 +200,6 @@ where impl Drop for MergeBatcher where - C: ContainerBuilder + Default, M: Merger, { fn drop(&mut self) { @@ -225,7 +233,17 @@ pub trait Merger: Default { pub mod container { - //! A general purpose `Batcher` implementation for arbitrary containers. + //! A general purpose `Merger` implementation for arbitrary containers. + //! + //! The implementation requires implementations of two traits, `ContainerQueue` and `MergerChunk`. + //! The `ContainerQueue` trait is meant to wrap a container and provide iterable access to it, as + //! well as the ability to return the container when iteration is complete. + //! The `MergerChukn` trait is meant to be implemented by containers, and it explains how container + //! items should be interpreted with respect to times, and with respect to differences. + //! These two traits exist instead of a stack of constraints on the structure of the associated items + //! of the containers, allowing them to perform their functions without destructuring their guts. + //! + //! Standard implementations exist in the `vec`, `columnation`, and `flat_container` modules. use std::cmp::Ordering; use std::marker::PhantomData; @@ -249,20 +267,26 @@ pub mod container { /// Behavior to dissect items of chunks in the merge batcher pub trait MergerChunk : SizableContainer { - /// The owned time type. + /// An owned time type. + /// + /// This type is provided so that users can maintain antichains of something, in order to track + /// the forward movement of time and extract intervals from chains of updates. type TimeOwned; /// The owned diff type. + /// + /// This type is provided so that users can provide an owned instance to the `push_and_add` method, + /// to act as a scratch space when the type is substantial and could otherwise require allocations. type DiffOwned: Default; - /// Compares a borrowed time to an antichain of owned times. + /// Relates a borrowed time to antichains of owned times. /// - /// If `upper` is less or equal to `time`, the method returns true and ensures that `frontier` reflects `time`. + /// If `upper` is less or equal to `time`, the method returns `true` and ensures that `frontier` reflects `time`. fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool; /// Push an entry that adds together two diffs. /// /// This is only called when two items are deemed mergeable by the container queue. - /// If the two diffs would cancel, do not push anything. + /// If the two diffs added together is zero do not push anything. fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned); /// Account the allocations behind the chunk. @@ -276,6 +300,7 @@ pub mod container { /// A merger for arbitrary containers. /// /// `MC` is a [`Container`] that implements [`MergerChunk`]. + /// `CQ` is a [`ContainerQueue`] supporting `MC`. pub struct ContainerMerger { _marker: PhantomData<(MC, CQ)>, } @@ -287,7 +312,6 @@ pub mod container { } impl ContainerMerger { - /// Helper to get pre-sized vector from the stash. #[inline] fn empty(&self, stash: &mut Vec) -> MC { @@ -297,14 +321,12 @@ pub mod container { container }) } - /// Helper to return a chunk to the stash. #[inline] fn recycle(&self, chunk: MC, stash: &mut Vec) { // TODO: Should we only retain correctly sized containers? stash.push(chunk); } - } impl Merger for ContainerMerger diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 37eed5543..cb6c83758 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -15,14 +15,11 @@ use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegi use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher}; -use crate::trace::implementations::merge_batcher::container::ContainerMerger; use crate::trace::implementations::merge_batcher_col::ColumnationMerger; use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger; use crate::trace::implementations::merge_batcher::VecMerger; use crate::trace::rc_blanket_impls::RcBuilder; -use crate::trace::implementations::merge_batcher::container::columnation::TimelyStackQueue; - use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout}; pub use self::val_batch::{OrdValBatch, OrdValBuilder};