Skip to content

Commit

Permalink
Make inherent reduce methods
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Dec 6, 2023
1 parent 6468a5a commit 22ccc47
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 29 deletions.
9 changes: 4 additions & 5 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,15 @@ fn main() {
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::PreferredSpine;
use differential_dataflow::operators::reduce::ReduceCore;

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredSpine<[u8],[u8],_,_>>();
// .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredSpine<[u8],u8,_,_>>();
// .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |k,_v1,_v2| {
println!("{:?}", k.text);
Expand Down
43 changes: 43 additions & 0 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,49 @@ where
}
}

// Direct reduce implementations.
use difference::Abelian;
impl<G: Scope, T1> Arranged<G, T1>
where
G::Timestamp: Lattice+Ord,
T1: TraceReader<Time=G::Timestamp>+Clone+'static,
T1::Diff: Semigroup,
{
/// A direct implementation of `ReduceCore::reduce_abelian`.
pub fn reduce_abelian<L, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>= T1::Key<'a>, Time=G::Timestamp>+'static,
T2::ValOwned: Data,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
if !input.is_empty() {
logic(key, input, change);
}
change.extend(output.drain(..).map(|(x,d)| (x, d.negate())));
crate::consolidation::consolidate(change);
})
}

/// A direct implementation of `ReduceCore::reduce_core`.
pub fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=G::Timestamp>+'static,
T2::ValOwned: Data,
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
use operators::reduce::reduce_trace;
reduce_trace(self, name, logic)
}
}


impl<'a, G: Scope, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
where
G::Timestamp: Lattice+Ord,
Expand Down
30 changes: 6 additions & 24 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use trace::implementations::{KeySpine, ValSpine};
use trace::TraceReader;

/// Extension trait for the `reduce` differential dataflow method.
pub trait Reduce<G: Scope, K: Data, V: Data, R: Semigroup> : ReduceCore<G, K, V, R> where G::Timestamp: Lattice+Ord {
pub trait Reduce<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
/// Applies a reduction function on records grouped by key.
///
/// Input data must be structured as `(key, val)` pairs.
Expand Down Expand Up @@ -327,28 +327,10 @@ where
}
}

impl<G: Scope, K, V, T1, R: Semigroup> ReduceCore<G, K, V, R> for Arranged<G, T1>
where
K: ToOwned + Ord + ?Sized,
K::Owned: Data,
V: ToOwned + Ord + ?Sized,
G::Timestamp: Lattice+Ord,
T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwned = <K as ToOwned>::Owned, Val<'a>=&'a V, Time=G::Timestamp, Diff=R>+Clone+'static,
{
fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::ValOwned: Data,
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
reduce_trace(self, name, logic)
}
}

fn reduce_trace<G, T1, T2, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
/// A key-wise reduction of values in an input trace.
///
/// This method exists to provide reduce functionality without opinions about qualifying trace types.
pub fn reduce_trace<G, T1, T2, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
G: Scope,
G::Timestamp: Lattice+Ord,
Expand All @@ -359,7 +341,7 @@ where
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: for<'a> FnMut(T1::Key<'a>, &[(T1::Val<'a>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static,
{
let mut result_trace = None;

Expand Down

0 comments on commit 22ccc47

Please sign in to comment.