The title was meant to be a clever double-entendre, but re-reading it now it sounds pretty ominous. Not to worry, I'm not going anywhere.
What this post is actually about is a cool thing in differential dataflow called "arrangements". They probably have a real name too, but to get you excited they are the stream-processing equivalent of "indices" in a database. Streams of updates flow in to an arrangement, and what comes out the other side are "arranged" (thus the name) batches of updates, and a handle to a common shared index reflecting the batches.
These "arrangements" can make computations a lot more efficient, and enable behavior you don't see in other stream processing systems. So perhaps this post covers something new and interesting and unique to differential dataflow. Let's shoot for "interesting" at least, and we can try and sort out the other two later on.
Let's say you are the sort of person who likes graphs! As in: Pairs. Of. Integers. ZOMGYES!
(All of this applies to relational-y computations other than graphs, so stick around even if this isn't your thing.)
Imagine we have a stream of graph changes, things that look like
((src, dst), time, diff)
telling us when edges have been added to and removed from our graph. Pretty handy stuff.
Let's write a simple computation against these edges. For example, let's write the computation that, when provided with a query node, tells you the name of the nodes you can reach from the query. So, edges
will be our differential dataflow collection of edges, and query
will be a collection containing node identifiers we are interested in learning about.
edges.semijoin(&query)
.inspect(|x| println!("reachable @ 1: {:?}", x))
All that we've done is take our edges
collection, and restrict it down to those pairs (src, dst)
where src
is in query
. That is what semijoin
does (it's like a join, but where one side doesn't have any data to contribute).
Let's look at a bit of output here, just to get us all on the same page about what we are seeing. I wrote a program that loads up and then randomly mutates a graph, and while it is doing this issues and then retracts query nodes. I made the graph not very big so that the output would be manageable.
reachable @ 1: ((30, 0), (Root, 0), 1)
reachable @ 1: ((76, 0), (Root, 0), 1)
reachable @ 1: ((30, 0), (Root, 1), -1)
reachable @ 1: ((76, 0), (Root, 1), -1)
reachable @ 1: ((62, 1), (Root, 1), 1)
reachable @ 1: ((62, 1), (Root, 2), -1)
reachable @ 1: ((43, 2), (Root, 2), 1)
reachable @ 1: ((43, 2), (Root, 3), -1)
reachable @ 1: ((58, 3), (Root, 3), 1)
reachable @ 1: ((83, 3), (Root, 3), 1)
...
I've re-arranged the output a little bit to try to make clearer what is going on. At time (Root, 0)
, which is the first time, we issued a query for node 0
. We got to see that both 30
and 76
are reachable at this point. This is great!
We the move on to the next time, (Root, 1)
, where we see these outputs negated. This is because we removed node 0
from the query set. Differential dataflow maintains computations, and if we didn't remove node 0
we would in essence have a standing query for the one-hop neighbors of that node (also a cool thing to do, but not what we are doing here). We also see that node 1
can reach node 62
.
We move through some more times, seeing the retraction of the outputs for the previous (and now retracted) queries and the introduction of the outputs for the next query.
Let's make the computation a bit more interesting. How about instead of looking up the neighbors of query nodes, we look up their two-hop neighborhoods? We will need to change the program a little bit, because we were relying on the existence of the query node in the output edge to tell us who is connected to which query; we'll add the query node in as data now, and keep it around:
query.map(|q| (q, q))
.join_map(&edges, |_src, q, dst| (dst, q))
.inspect(|x| println!("reachable @ 1: {:?}", x))
.join_map(&edges, |_src, q, dst| (dst, q))
.inspect(|x| println!("reachable @ 2: {:?}", x))
Here we grow a set of nodes, each labeled with the query that started things out. First we get the one-hop neighbors, and then from those nodes we take another step to the two-hop neighbors. Importantly, we hold on to the query identifier q
as we do this, so that we can interpret the result when there are multiple changes to query
at the same time (e.g. an addition and a deletion, but also if you wanted to ask about multiple nodes at the same time, or leave some queries standing as the graph changes).
I've got the output from the computation, but we aren't going to go out as far (to save you the reading). Out to (Root, 2)
we have
reachable @ 1: ((30, 0), (Root, 0), 1)
reachable @ 1: ((76, 0), (Root, 0), 1)
reachable @ 2: ((19, 0), (Root, 0), 1)
reachable @ 2: ((87, 0), (Root, 0), 1)
reachable @ 1: ((30, 0), (Root, 1), -1)
reachable @ 1: ((76, 0), (Root, 1), -1)
reachable @ 2: ((19, 0), (Root, 1), -1)
reachable @ 2: ((87, 0), (Root, 1), -1)
reachable @ 1: ((62, 1), (Root, 1), 1)
reachable @ 1: ((62, 1), (Root, 2), -1)
reachable @ 1: ((43, 2), (Root, 2), 1)
reachable @ 2: ((73, 2), (Root, 2), 1)
reachable @ 1: ((43, 2), (Root, 3), -1)
reachable @ 2: ((73, 2), (Root, 3), -1)
We start with our friend node 0
, and see that not only can it reach 30
and 76
in one step, it can reach 19
and 87
in two steps. We then say good bye to node 0
and it's friends in the next round, learning that node 1
can reach 62
as before but goes no further. Lastly, for this example, we get to see that node 2
can make it to node 73
in two step.
I think this computation is actually sort of cool, but that isn't why we are here. Let's quickly agree that we could compute the neighbors out at any distance, by repeating the pattern of joining we had up above, for example
query.map(|q| (q, q))
.join_map(&edges, |_src, q, dst| (dst, q))
.join_map(&edges, |_src, q, dst| (dst, q))
.join_map(&edges, |_src, q, dst| (dst, q))
.join_map(&edges, |_src, q, dst| (dst, q))
.join_map(&edges, |_src, q, dst| (dst, q))
.inspect(|x| println!("reachable @ 5: {:?}", x))
This totally works in differential dataflow, and depending on the graph we might get quick answers about neighborhoods, or the neighborhood might explode and produce epic volumes of node identifiers.
While this does "totally works", it totally works somewhat inefficiently. Each instance of join_map
uses edges
as an input, and internally each instance will index and maintain all of the changes that edges
undergoes. Each instance will independently (i) shuffle the updates in edges
, (ii) do some compute to maintain the index, and (iii) maintain a copy of the index in memory.
Each instance needs to do this because efficiently implementing a streaming join where either input could change requires indexed representations of each input.
It is probably clear that each of these operators produce the exact same result, wasting communication, computation, and memory that could be used elsewhere. We would love to do that indexing work only once, but .. how do we do this in a dataflow system, where each of these computations are meant to be independent?
Let's start with the correct answer to the problem above, and then we can wax philosophical on what makes this work out.
let edges = edges.arrange_by_key();
query.map(|q| (q, q))
.join_arranged(&edges, |_src, q, dst| (dst, q))
.join_arranged(&edges, |_src, q, dst| (dst, q))
.join_arranged(&edges, |_src, q, dst| (dst, q))
.join_arranged(&edges, |_src, q, dst| (dst, q))
.join_arranged(&edges, |_src, q, dst| (dst, q))
.inspect(|x| println!("reachable @ 5: {:?}", x))
This program maintains just one indexed copy of edges
, with no redundant communication, computation, or memory use.
We are taking advantage of a fairly serious architectural property of differential dataflow computations, one not shared by several other stream processors: each instance of each operator is partitioned across workers by key, consistently. Each of the join
operators uses the same partition of the key space to partition its implementation across workers, and so each operator in each worker can rely on access to a local copy of the same data.
In fact, we try to make this clearer by separating the implementation of join
into two parts: first arrange the data, and second apply operator logic on the arranged data. Here is the actual implementation of join_map
from up above
let arranged1 = self.arrange_by_key();
let arranged2 = other.arrange_by_key();
arranged1.join_arranged(&arranged2, move |k,v1,v2| logic(&k.item,v1,v2))
All that we are doing is breaking out one of these steps, where we arrange edges
, and re-using that work.
Let's look a bit closer at what these arrangements are, which should give a bit more insight as to how and why this could possibly work. First off, remember that before arranging the data, we had something that moved changes along a timely dataflow stream, roughly:
Stream<(Data, Time, Diff)>
This is a pretty handy and generic way to work with updates, as there is no structure imposed.
An arrangement is going to look more like this:
pub struct Arrangement<K, V, T, R> {
changes: Stream<Rc<Batch<K, V, T, R>>>,
trace: Rc<RefCell<Trace<K, V, T, R>>>,
}
This isn't yet much clearer, but bear with me!
A Batch
is a type representing an indexed pile of updates. In differential dataflow, we've chosen to index things first by key K
then by value V
and finally by time T
. The R
is something that you should probably think of as isize
, reflecting changes in counts. Each batch is just some pile of updates, indexed in this scheme. Rather than move update records along our timely dataflow stream, we are going to move batches of update records. And super importantly, we are going to move immutable reference-counted batches, so that all of the recipients get to share the same hunk of memory.
A Trace
is just an append-only sequence of batches. It will be just those batches we have sent along the stream, so that operators can get access to historical updates (both join
and group
need to do this). We append to the list and compact batches as they are added so the trace isn't technically immutable (viz. RefCell
), but it is shared among all of the users (viz. Rc
).
Each operator, like join
and group
is now written to work off of an Arrangement
, a stream of pre-indexed updates and a shared references to the compacted historical updates.
There are some downsides to using arranged data, although more exciting implementations could probably try to work around them. Mainly, by sharing the data we also need to share the restrictions we impose on the data when it comes to compaction of the history. If we have some operator that is failing to move forward in time, or doing so slowly, it may require the trace maintain historical data that others do not need so that it can operate correctly. Other operators will have to flip through historical distinctions that they themselves wouldn't have maintained, and could execute less efficiently than if they had their own personally compacted representation.
We've also bought pretty hard into immutability, which lines up nicely with shared data, but can be inefficient and wouldn't be necessary for uniquely owned indices. This prevents updating in place, and moves us towards a strategy of appending and merging immutable batches, and doing a great many allocations. I personally think this is responsible, as looking forward we are going to be "sharing" our trace data with the fault-tolerance mechanisms, who will benefit from immutably defined batches of data to transcribe and recover from.
This arrangement stuff has existed for years; the Rust version of differential dataflow has had it for a while. Why write about it now?
We've started to do some things that I think are pretty neat, and really start to make differential dataflow capable of (more) things that other systems are not. Let's walk through a few more examples to get there.
Differential dataflow uses timely dataflow scopes to effect iterative computations. For example, if you wanted to generalize the neighborhood queries to arbitrary distances, you could write an iterative computation to do that:
let roots = query.map(|q| (q, 0));
roots.iterate(|dists| {
let edges = edges.enter(&dists.scope());
let roots = roots.enter(&dists.scope());
dists.join_map(&edges, |_k,l,d| (*d, l+1))
.concat(&roots)
.group(|_, s, t| t.push((s[0].0, 1)))
})
.map(|(_node, dist)| dist)
.consolidate()
This computation takes the query
and edges
collections from above, and sets the elements of roots
to be the query nodes and a distance of zero. It then explores out from these roots, tagging each dicovered node with a distance and retaining the smallest distance for each node. The results are then aggregated by distance, and we can see how many nodes are at each distance (though we spare ourselves having to see each individual node name).
This computation also uses edges
in a join
; we should probably be able to re-use that arranged data again, right?
Well, not so fast, no. Awkwardly, the timestamps on the edge data here have an additional coordinate, corresponding to the iteration in which a change can happen. As it turns out none of the edges actually change, so all of the updates just have a zero in the corresponding coordinate, but the types are not correct to re-use the prior arrangement.
The first piece of good news is that abstractions are now in place to make this possible. A shim around arrangements, including their batches and the trace, can pretend to put zeros in that coordinate for all consumers. This isn't brilliantly done yet (each use of a time involves a copy that didn't happen before), and could be better (if Rust/LLVM can be statically convinced that the value is always zero, and optimize away the code that uses the coordinate), but you can write:
let edges = edges.arrange_by_key();
query.map(|q| (q, q))
.join_arranged(&edges, |_src, q, dst| (dst, q))
.join_arranged(&edges, |_src, q, dst| (dst, q))
.join_arranged(&edges, |_src, q, dst| (dst, q))
.join_arranged(&edges, |_src, q, dst| (dst, q))
.join_arranged(&edges, |_src, q, dst| (dst, q))
.inspect(|x| println!("reachable @ 5: {:?}", x))
let roots = query.map(|q| (q, 0));
roots.iterate(|dists| {
let edges = edges.enter(&dists.scope());
let roots = roots.enter(&dists.scope());
dists.join_map(&edges, |_k,l,d| (*d, l+1))
.concat(&roots)
.group(|_, s, t| t.push((s[0].0, 1)))
})
.map(|(_node, dist)| dist)
.consolidate()
It looks so easy, but behind the scenes we added an enter
implementation to Arrangement
, which builds a bit of proxy infrastructure and makes life not so hard for you. Just hard for me.
Still just one copy of the edges in this computation.
This is where you may need to sit down. If you were reading this standing up, good on you though.
We've seen how arranged data can be used within a dataflow, where each dataflow has the property that they get built with "whole-dataflow" knowledge, and every operator sees the same stream of batches fly past. I mean, how could it be any other way?
Here is something you can write in differential dataflow, and we are going to talk through how cool and amazing it is.
// create a dataflow managing an ever-changing edge collection.
let edges = worker.dataflow(|scope| {
// edges come from somewhere (elided, because unimportant/messy)
edges.arrange_by_key()
.trace
};
// [...]
// create a dataflow for distance 2 neighbors.
let mut query = worker.dataflow(|scope| {
let edges = edges.import(scope);
let (input, query) = scope.new_input();
let query = query.as_collection();
query.map(|x| (x, x))
.join_arranged(&edges, |_n, &q, &d| (d, q))
.join_arranged(&edges, |_n, &q, &d| (d, q))
.inspect(|x| println!("neighbors @ 2: {:?}", x));
input
});
Hrm. What is going on?
We have one dataflow that arranges edges
and returns the arranged trace. We don't really know where the edges are coming from, and it may be instructive that the rest of our program actually makes sense even without knowing this.
We also create a second dataflow to report the neighbors at distance two from some query nodes. This looks a lot like what we did up above, but it is now in a different dataflow. Big deal? Doesn't seem like it to me.
The awesome line up there is the one we haven't mentioned yet:
// [...]
Stuff can happen between defining these two dataflows. Like, we could run the first dataflow for five minutes or so, and only then form the second dataflow. We are attaching a second dataflow to a live source of data. And, if I've done everything correctly (place your bets), it does something sane.
Let's talk through two bad things we could do when attaching to live data, and then talk through the third thing that I think is sane.
-
We could just attach the stream of updates, so that we start feeding all new changes at our operators. This could plausibly work for something like
join
, but we are going to start seeing all sorts of deletions of elements we never saw added, and any logic more complicated than a join is just going to have mysterious semantics. We are missing important history! -
We could keep around the full history of
edges
, and replay all of the batches that we ever sent along the stream. This would be like starting the second dataflow from the same point as the first dataflow started, which is possible because we kept so much data around. But we really kept a lot of data around, didn't we? What if// [...]
actually waits a few hours, or days, or whatever. Are we really going to keep everything, forever?
Actually the second option isn't horrible, it is much better than the first in my opinion, and will be a special case of what we actually do. Which is ...
The .trace
that got returned from the first dataflow is an instance of the TraceReader
trait. It acts as a capability for reading data out of the trace at a certain time. This means that it has a handle to the trace data, but also that it guarantees that the trace data will not be compacted beyond a certain point in time. Of course, you can downgrade the capability (to a future time), allowing the trace to compact itself as the computations move along. Each of the operators use these capabilities to share the trace and allow it to creep forward at the rate each are comfortable with.
If in the // [...]
we regularly downgrade the edges
capability too, then the trace can merge the batches it has sent.
When you .import(scope)
a trace, we will make a new source of updates that first feeds all of the compacted historical batches the trace currently has, followed by the stream of new batches of updates as they are accepted into the trace.
That seems like a sane thing to do. Why is it so awesome?
Let's look at the other computation we might have attached to our running edges
trace.
// [...]
let mut roots = worker.dataflow(|scope| {
let edges = edges.import(scope);
let (input, roots) = scope.new_input();
let roots = roots.as_collection()
.map(|x| (x, 0));
roots.iterate(|dists| {
let edges = edges.enter(&dists.scope());
let roots = roots.enter(&dists.scope());
dists.join_arranged(&edges, |_k,l,d| (*d, l+1))
.concat(&roots)
.group_u(|_, s, t| t.push((s[0].0, 1)))
});
input
});
This is a breadth-first distance labelling computation. When fed a pile of accumulated historical data, this is what most people would call "batch processing". However, we are using the exact same infrastructure to perform the batch computation as we are to subsequently maintain the computation as the historical data starts to change.
Not only is it awesome to spin up a batch computation against live data, it is actually faster to do things this way. The bfs computation gets its edge collection pre-indexed, and that is a bunch of work it doesn't have to do again. I fired up the computation and measured the time it takes to do bfs on a 10m node 100m edge graph, either from scratch (in which case it gets billed for the time to arrange the edges) versus the time it takes to finish the computation attaching to an existing indexed stream.
from scratch: Duration { secs: 10, nanos: 82284689 }
attachment: Duration { secs: 6, nanos: 149146638 }
Sweet, apparently we save a bunch of time by re-using existing indexed state. Who would have guessed? Plus, though not reported here, the computation keeps running with low latency updates to the iterative distance computation.
Also, the additional memory footprint? Nominal: the distances for the 10m nodes, rather than anything proportional to the 100m edges.
As I understand these things, this is probably an instance of the "Kappa Architecture", in which you don't bother with "batch processors" and just use a non-crap stream processor for everything. Differential dataflow improves on the proposal in the link up above, which just proposes to stash historical data and replay everything you need, by maintaining indexed representations and starting from them. If you wanted to know the two-hop neighbors of your social graph, you should just grab a pointer to the shared state rather than replay a month of social interactions.
The Kappa Architecture contrasts itself with the "Lambda Architecture", which apparently means "tape Hadoop and Storm together". There are other "architecture", with other Greek letters. The link above contains an excellent example of simultaneous self-deprecation and vicious knife twisting:
Maybe we could call this the Kappa Architecture, though it may be too simple of an idea to merit a Greek letter.
Nailed it.
I don't think we need more architectures. Batch and streaming computations can be exactly the same thing. A batch computation is just a streaming computation where all of the input updates have the same timestamp, and then cease. You don't need a hybrid architecture, you just need to build a sufficiently capable stream processor that doesn't fall over when you get a massive pile of data all at once (see postscript notes).
Differential dataflow is already faster than many batch processors at batch computation, it is faster than many stream processors at stream computation. It does both of these at the same time using the same framework. It does cool shit like iterative computation. It's got sweet-ass index re-use. I don't want to read any more about data processing systems that can only manage a strict subset of these things.
And if any of you are starting to come around and want to help, let me know. We are over here:
https://github.com/frankmcsherry/differential-dataflow
What actually happens when we start up a new "batch" computation? We fire what might be billions of edges off to a newly minted join
operator, who .. does what exactly?
Let's consider the case of breadth-first distance labeling up above. The dataflow is brought into existence, and let's imagine that the only data currently in play are (i) a few roots in the roots
collection, and (ii) a few batches of enormous numbers of edges in the edges
collection.
The join
method first arranges its inputs, and in this case that means arranging the roots
and just chilling with the pre-arranged edge data. The join operator itself drains batches from each of its two inputs. For each input batch, the operator joins it with the historical data on the other input. For example, we will get an initial batch of roots, and this will be joined against the full historical data for edges, involving a few look-ups for each root. We will also get an initial few mega-batches of edge data, and they get joined against the full historical data of the roots, involving ..
Ok, this is interesting! The way join
is written at the moment, it traverses each element of each input batch, and tries to find the key in the history of the other input. Here, this would mean traversing all edges looking things up in the roots set. That would be a huge waste of time, so let's fix that.
Deep in the bowels of the join operator, we find code that looks like so:
while batch.key_valid() && effort < *fuel {
trace.seek_key(batch.key());
if trace.key_valid() && trace.key() == batch.key() {
// do some work
}
batch.step_key();
}
Notice that this walks through all of batch
, even if trace
is really small or maybe empty. The code can be improved, thusly:
while batch.key_valid() && trace.key_valid() && effort < *fuel {
match trace.key().cmp(batch.key()) {
Ordering::Less => trace.seek_key(batch.key()),
Ordering::Greater => batch.seek_key(trace.key()),
Ordering::Equal => {
// do some work
trace.step_key();
batch.step_key()
}
}
}
Now when the keys are different we use the random-access awesomeness to zip forward until we meet the key of the other, symmetrically for both batch and trace. We don't have to worry about spending massive amounts of times scanning edges
; we will use the available roots to just leap over most of them.
Hey what is that effort < *fuel
thing, anyhow?
This was a pre-optimization, made a while back, that turns out to be hugely important here.
The join operator does not actually process all of its input batches, because that would be a whole lot of work to do at once. Instead, it prepares work items, each of which have a method work
that takes a fuel: usize
parameter which indicates an amount of work they should do before returning control. Rather than produce some enormous pile of output, the join operator produces roughly a million-ish outputs and then returns (technically, it returns once a key causes it to produce more than one million outputs, so if some key is responsible for billions of outputs, erm. This can be fixed!)
This was important for differential dataflow because joins can produce massive outputs that can be coalesced down, but if the outputs are all sitting in output buffers nothing good happens. The join operator needs to return control periodically to let workers drain its output buffers, and move the data along to the operators that can coalesce it down.
The optimization is also important if you find yourself in a hybrid batch-stream computation, where some folks are expecting low latency, and some jerk has just come along and proposed doing bfs RIGHT NOW. Each operator should yield after bounded amounts of work to ensure responsiveness for other parts of the stream computation. At the moment just join
does this, but group
can easily learn using the same techniques.
You know what is great? There are probably lots of other issues that make batch computation disruptive to stream computation. I'm now in a position to check that stuff out, and sort things out where they are broken. I'm going to do some MF'ing research. As far as I can tell, there is no reason you shouldn't be able to attach a batch computation to an existing differential dataflow stream and expect anything other than smooth sailing.