diff --git a/container/Cargo.toml b/container/Cargo.toml index 470172198..3ec856b62 100644 --- a/container/Cargo.toml +++ b/container/Cargo.toml @@ -7,5 +7,6 @@ license = "MIT" [dependencies] columnation = { git = "https://github.com/frankmcsherry/columnation" } -flatcontainer = "0.4" +#flatcontainer = "0.4" +flatcontainer = { path = "../../flatcontainer" } serde = { version = "1.0"} diff --git a/container/src/lib.rs b/container/src/lib.rs index a86c09b5a..0cddb3864 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -6,6 +6,7 @@ use std::collections::VecDeque; pub mod columnation; pub mod flatcontainer; +pub mod zero_copy; /// A container transferring data through dataflow edges /// diff --git a/container/src/zero_copy.rs b/container/src/zero_copy.rs new file mode 100644 index 000000000..d3af2265b --- /dev/null +++ b/container/src/zero_copy.rs @@ -0,0 +1,147 @@ +//! Zero-copy container builders + +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::sync::Arc; +use flatcontainer::{FlatStack, Push, Region}; +use flatcontainer::flatten::{DefaultFlatWrite, DerefWrapper, Entomb, Exhume}; +use crate::{Container, ContainerBuilder, PushInto}; + +type Buffer = DerefWrapper>>; + +/// TODO +pub struct ZeroCopyBuilder +where + R: Region + Exhume, +{ + pending: FlatStack, + ready: VecDeque>, + current: Option>>, +} + +impl Default for ZeroCopyBuilder +where + R: Region + Exhume, + R::Flat: Region, +{ + fn default() -> Self { + Self { + pending: FlatStack::default(), + ready: VecDeque::default(), + current: None, + } + } +} + +impl ContainerBuilder for ZeroCopyBuilder +where + R: Clone + Default + Entomb + Exhume + Region + 'static, + R::Flat: Clone + Exhume>>> + 'static, +{ + type Container = ZeroCopyWrapper>; + + fn extract(&mut self) -> Option<&mut Self::Container> { + self.current = self.ready.pop_front().map(|buffer| { + let buffer = Arc::new(buffer); + let length = buffer.len(); + ZeroCopyWrapper { + buffer, + length, + _marker: PhantomData, + } + }); + self.current.as_mut() + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.pending.is_empty() { + let mut length = 0; + type W<'a> = DefaultFlatWrite<&'a mut Vec>; + self.pending.flat_size::>(&mut length); + W::finish_size(&mut length); + let mut buffer = Vec::with_capacity(length); + let mut write = DefaultFlatWrite::new(&mut buffer); + self.pending.entomb(&mut write).unwrap(); + self.ready.push_back(buffer); + } + + self.extract() + } +} + +impl PushInto for ZeroCopyBuilder +where + R: Region + Entomb + Exhume + Push, + R::Flat: Region, +{ + fn push_into(&mut self, item: T) { + self.pending.copy(item); + + // Estimate `pending` size in bytes + let mut length = 0; + type W<'a> = DefaultFlatWrite<&'a mut Vec>; + self.pending.flat_size::>(&mut length); + W::finish_size(&mut length); + if length > 1024 { + let mut buffer = Vec::with_capacity(length); + let mut write = DefaultFlatWrite::new(&mut buffer); + self.pending.entomb(&mut write).unwrap(); + self.ready.push_back(buffer); + } + } +} + +/// TODO +pub struct ZeroCopyWrapper { + buffer: Arc>, + length: usize, + _marker: PhantomData, +} + +impl Clone for ZeroCopyWrapper { + fn clone(&self) -> Self { + Self { + buffer: Arc::clone(&self.buffer), + length: self.length, + _marker: PhantomData, + } + } +} + +impl Default for ZeroCopyWrapper { + fn default() -> Self { + Self { + buffer: Arc::new(Vec::new()), + length: 0, + _marker: PhantomData, + } + } +} + +impl Container for ZeroCopyWrapper> +where + for<'a> R: Exhume>>> + Region +'static, +{ + type ItemRef<'a> = R::ReadItem<'a> where Self: 'a; + type Item<'a> = R::ReadItem<'a> where Self: 'a; + + fn len(&self) -> usize { + self.length + } + + fn clear(&mut self) { + todo!() + } + + type Iter<'a> = std::iter::Empty>; + + fn iter(&self) -> Self::Iter<'_> { + std::iter::empty() + } + + type DrainIter<'a> = std::iter::Empty>; + + fn drain(&mut self) -> Self::DrainIter<'_> { + std::iter::empty() + } +} diff --git a/timely/examples/zero_copy.rs b/timely/examples/zero_copy.rs new file mode 100644 index 000000000..96f755229 --- /dev/null +++ b/timely/examples/zero_copy.rs @@ -0,0 +1,139 @@ +use std::collections::HashMap; + +use rand::{Rng, SeedableRng, rngs::SmallRng}; + +use timely::dataflow::operators::{ToStream, Concat, Feedback, ConnectLoop}; +use timely::dataflow::operators::generic::operator::Operator; +use timely::dataflow::channels::pact::Exchange; +use timely::dataflow::operators::core::ToStreamBuilder; +use timely_container::zero_copy::ZeroCopyBuilder; + +fn main() { + + // command-line args: numbers of nodes and edges in the random graph. + let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap(); + let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap(); + + // let logging = ::timely::logging::to_tcp_socket(); + timely::execute_from_args(std::env::args().skip(3), move |worker| { + + let index = worker.index(); + let peers = worker.peers(); + + let mut rng: SmallRng = SeedableRng::seed_from_u64(index as u64); + + // pending edges and node updates. + let mut edge_list = Vec::new(); + let mut node_lists = HashMap::new(); + + // graph data; offsets into targets. + let mut offsets = Vec::new(); + let mut targets = Vec::new(); + + // holds the bfs parent of each node, or u32::max_value() if unset. + let mut done = vec![u32::max_value(); 1 + (nodes / peers)]; + + let start = std::time::Instant::now(); + + worker.dataflow::(move |scope| { + + // generate part of a random graph. + let iter = (0..edges / peers) + .map(move |_| (rng.gen_range(0..nodes as u32), rng.gen_range(0..nodes as u32))); + let graph = ToStreamBuilder::>::to_stream_with_builder(iter, scope); + + // define a loop variable, for the (node, worker) pairs. + let (handle, stream) = scope.feedback(1usize); + + // use the stream of edges + graph.binary_notify( + &stream, + Exchange::new(|x: &(u32, u32)| u64::from(x.0)), + Exchange::new(|x: &(u32, u32)| u64::from(x.0)), + "BFS", + vec![], + move |input1, input2, output, notify| { + + // receive edges, start to sort them + input1.for_each(|time, data| { + notify.notify_at(time.retain()); + edge_list.push(data.replace(Vec::new())); + }); + + // receive (node, worker) pairs, note any new ones. + input2.for_each(|time, data| { + node_lists.entry(time.time().clone()) + .or_insert_with(|| { + notify.notify_at(time.retain()); + Vec::new() + }) + .push(data.replace(Vec::new())); + }); + + notify.for_each(|time, _num, _notify| { + + // maybe process the graph + if *time == 0 { + + // print some diagnostic timing information + if index == 0 { println!("{:?}:\tsorting", start.elapsed()); } + + // sort the edges (previously: radix sorted). + edge_list.sort(); + + let mut count = 0; + for buffer in &edge_list { count += buffer.len(); } + + // allocate sufficient memory, to avoid resizing. + offsets = Vec::with_capacity(1 + (nodes / peers)); + targets = Vec::with_capacity(count); + + // construct the graph + offsets.push(0); + let mut prev_node = 0; + for buffer in edge_list.drain(..) { + for (node, edge) in buffer { + let temp = node / peers as u32; + while prev_node < temp { + prev_node += 1; + offsets.push(targets.len() as u32) + } + targets.push(edge); + } + } + while offsets.len() < offsets.capacity() { + offsets.push(targets.len() as u32); + } + } + + // print some diagnostic timing information + if index == 0 { println!("{:?}:\ttime: {:?}", start.elapsed(), time.time()); } + + if let Some(mut todo) = node_lists.remove(&time) { + let mut session = output.session(&time); + + // we could sort these, or not (previously: radix sorted). + // todo.sort(); + + for buffer in todo.drain(..) { + for (node, prev) in buffer { + let temp = (node as usize) / peers; + if done[temp] == u32::max_value() { + done[temp] = prev; + let lower = offsets[temp] as usize; + let upper = offsets[temp + 1] as usize; + for &target in &targets[lower..upper] { + session.give((target, node)); + } + } + } + } + } + }); + } + ) + .concat(&(0..1).map(|x| (x,x)).to_stream(scope)) + .connect_loop(handle); + }); + }).unwrap(); // asserts error-free execution; +}