Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed Jun 14, 2024
1 parent a89c4f6 commit 2757402
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 1 deletion.
3 changes: 2 additions & 1 deletion container/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ license = "MIT"

[dependencies]
columnation = { git = "https://github.com/frankmcsherry/columnation" }
flatcontainer = "0.4"
#flatcontainer = "0.4"
flatcontainer = { path = "../../flatcontainer" }
serde = { version = "1.0"}
1 change: 1 addition & 0 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::collections::VecDeque;

pub mod columnation;
pub mod flatcontainer;
pub mod zero_copy;

/// A container transferring data through dataflow edges
///
Expand Down
147 changes: 147 additions & 0 deletions container/src/zero_copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
//! Zero-copy container builders
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::sync::Arc;
use flatcontainer::{FlatStack, Push, Region};
use flatcontainer::flatten::{DefaultFlatWrite, DerefWrapper, Entomb, Exhume};
use crate::{Container, ContainerBuilder, PushInto};

type Buffer = DerefWrapper<Arc<Vec<u8>>>;

/// TODO
pub struct ZeroCopyBuilder<R>
where
R: Region + Exhume<Buffer>,
{
pending: FlatStack<R>,
ready: VecDeque<Vec<u8>>,
current: Option<ZeroCopyWrapper<FlatStack<R::Flat>>>,
}

impl<R> Default for ZeroCopyBuilder<R>
where
R: Region + Exhume<Buffer>,
R::Flat: Region,
{
fn default() -> Self {
Self {
pending: FlatStack::default(),
ready: VecDeque::default(),
current: None,
}
}
}

impl<R> ContainerBuilder for ZeroCopyBuilder<R>
where
R: Clone + Default + Entomb + Exhume<Buffer> + Region + 'static,
R::Flat: Clone + Exhume<DerefWrapper<Arc<Vec<u8>>>> + 'static,
{
type Container = ZeroCopyWrapper<FlatStack<R::Flat>>;

fn extract(&mut self) -> Option<&mut Self::Container> {
self.current = self.ready.pop_front().map(|buffer| {
let buffer = Arc::new(buffer);
let length = buffer.len();
ZeroCopyWrapper {
buffer,
length,
_marker: PhantomData,
}
});
self.current.as_mut()
}

fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.pending.is_empty() {
let mut length = 0;
type W<'a> = DefaultFlatWrite<&'a mut Vec<u8>>;
self.pending.flat_size::<W<'_>>(&mut length);
W::finish_size(&mut length);
let mut buffer = Vec::with_capacity(length);
let mut write = DefaultFlatWrite::new(&mut buffer);
self.pending.entomb(&mut write).unwrap();
self.ready.push_back(buffer);
}

self.extract()
}
}

impl<R, T> PushInto<T> for ZeroCopyBuilder<R>
where
R: Region + Entomb + Exhume<Buffer> + Push<T>,
R::Flat: Region,
{
fn push_into(&mut self, item: T) {
self.pending.copy(item);

// Estimate `pending` size in bytes
let mut length = 0;
type W<'a> = DefaultFlatWrite<&'a mut Vec<u8>>;
self.pending.flat_size::<W<'_>>(&mut length);
W::finish_size(&mut length);
if length > 1024 {
let mut buffer = Vec::with_capacity(length);
let mut write = DefaultFlatWrite::new(&mut buffer);
self.pending.entomb(&mut write).unwrap();
self.ready.push_back(buffer);
}
}
}

/// TODO
pub struct ZeroCopyWrapper<R> {
buffer: Arc<Vec<u8>>,
length: usize,
_marker: PhantomData<R>,
}

impl<R> Clone for ZeroCopyWrapper<R> {
fn clone(&self) -> Self {
Self {
buffer: Arc::clone(&self.buffer),
length: self.length,
_marker: PhantomData,
}
}
}

impl<R> Default for ZeroCopyWrapper<R> {
fn default() -> Self {
Self {
buffer: Arc::new(Vec::new()),
length: 0,
_marker: PhantomData,
}
}
}

impl<R> Container for ZeroCopyWrapper<FlatStack<R>>
where
for<'a> R: Exhume<DerefWrapper<Arc<Vec<u8>>>> + Region +'static,
{
type ItemRef<'a> = R::ReadItem<'a> where Self: 'a;
type Item<'a> = R::ReadItem<'a> where Self: 'a;

fn len(&self) -> usize {
self.length
}

fn clear(&mut self) {
todo!()
}

type Iter<'a> = std::iter::Empty<R::ReadItem<'a>>;

fn iter(&self) -> Self::Iter<'_> {
std::iter::empty()
}

type DrainIter<'a> = std::iter::Empty<R::ReadItem<'a>>;

fn drain(&mut self) -> Self::DrainIter<'_> {
std::iter::empty()
}
}
139 changes: 139 additions & 0 deletions timely/examples/zero_copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use std::collections::HashMap;

use rand::{Rng, SeedableRng, rngs::SmallRng};

use timely::dataflow::operators::{ToStream, Concat, Feedback, ConnectLoop};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Exchange;
use timely::dataflow::operators::core::ToStreamBuilder;
use timely_container::zero_copy::ZeroCopyBuilder;

fn main() {

// command-line args: numbers of nodes and edges in the random graph.
let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();

// let logging = ::timely::logging::to_tcp_socket();
timely::execute_from_args(std::env::args().skip(3), move |worker| {

let index = worker.index();
let peers = worker.peers();

let mut rng: SmallRng = SeedableRng::seed_from_u64(index as u64);

// pending edges and node updates.
let mut edge_list = Vec::new();
let mut node_lists = HashMap::new();

// graph data; offsets into targets.
let mut offsets = Vec::new();
let mut targets = Vec::new();

// holds the bfs parent of each node, or u32::max_value() if unset.
let mut done = vec![u32::max_value(); 1 + (nodes / peers)];

let start = std::time::Instant::now();

worker.dataflow::<usize,_,_>(move |scope| {

// generate part of a random graph.
let iter = (0..edges / peers)
.map(move |_| (rng.gen_range(0..nodes as u32), rng.gen_range(0..nodes as u32)));
let graph = ToStreamBuilder::<ZeroCopyBuilder<_>>::to_stream_with_builder(iter, scope);

// define a loop variable, for the (node, worker) pairs.
let (handle, stream) = scope.feedback(1usize);

// use the stream of edges
graph.binary_notify(
&stream,
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
"BFS",
vec![],
move |input1, input2, output, notify| {

// receive edges, start to sort them
input1.for_each(|time, data| {
notify.notify_at(time.retain());
edge_list.push(data.replace(Vec::new()));
});

// receive (node, worker) pairs, note any new ones.
input2.for_each(|time, data| {
node_lists.entry(time.time().clone())
.or_insert_with(|| {
notify.notify_at(time.retain());
Vec::new()
})
.push(data.replace(Vec::new()));
});

notify.for_each(|time, _num, _notify| {

// maybe process the graph
if *time == 0 {

// print some diagnostic timing information
if index == 0 { println!("{:?}:\tsorting", start.elapsed()); }

// sort the edges (previously: radix sorted).
edge_list.sort();

let mut count = 0;
for buffer in &edge_list { count += buffer.len(); }

// allocate sufficient memory, to avoid resizing.
offsets = Vec::with_capacity(1 + (nodes / peers));
targets = Vec::with_capacity(count);

// construct the graph
offsets.push(0);
let mut prev_node = 0;
for buffer in edge_list.drain(..) {
for (node, edge) in buffer {
let temp = node / peers as u32;
while prev_node < temp {
prev_node += 1;
offsets.push(targets.len() as u32)
}
targets.push(edge);
}
}
while offsets.len() < offsets.capacity() {
offsets.push(targets.len() as u32);
}
}

// print some diagnostic timing information
if index == 0 { println!("{:?}:\ttime: {:?}", start.elapsed(), time.time()); }

if let Some(mut todo) = node_lists.remove(&time) {
let mut session = output.session(&time);

// we could sort these, or not (previously: radix sorted).
// todo.sort();

for buffer in todo.drain(..) {
for (node, prev) in buffer {
let temp = (node as usize) / peers;
if done[temp] == u32::max_value() {
done[temp] = prev;
let lower = offsets[temp] as usize;
let upper = offsets[temp + 1] as usize;
for &target in &targets[lower..upper] {
session.give((target, node));
}
}
}
}
}
});
}
)
.concat(&(0..1).map(|x| (x,x)).to_stream(scope))
.connect_loop(handle);
});
}).unwrap(); // asserts error-free execution;
}

0 comments on commit 2757402

Please sign in to comment.