I'm a bit stuck on an algorithmic problem in the innards of differential dataflow. It seems totally doable, with the right point of view, but in the meantime I thought I would write about it to explain the problem. The problem really seems to distill down the essence of what make differential dataflow interesting and different from stream processors with totally ordered time.
Let's start with forgetting everything you know about differential dataflow. For many of you, this will be pretty easy. This post has nothing to do with complex dataflows, distributed computation, join
or iterate
or anything other than the group
operator. We aren't even going to have keys for this group
operator, it is that elemental.
Imagine we have been handed a function logic
from some type V1
to some type V2
. This function is totally inscrutable to us; we can only evaluate it on inputs and see what it produces as output.
Ok, here is the tricky part: instead of just getting one input, for which we can just compute the output, the inputs are going to change. Some jerk is going to feed us a stream of changes to the input, in the form of updates that look like
(val1, time, diff)
where val1
is some element of V1
, time
is the moment in time when the change happens (more on this in a bit), and diff
is the amount by which the count of val1
change (think of this as +/-1
).
Our goal is to produce a corresponding stream of changes to the output, which should take the form
(val2, time, diff)
and indicate how and when the output experiences changes.
The easiest case to get our heads around is the case where times are totally ordered, meaning for any two distinct times t1
and t2
, either t1
comes before t2
or t2
comes before t1
. You might have thought this was pretty obviously going to be the case, and so calling this the "easiest case" might worry you a bit.
When times are totally ordered, we are pretty set. For any stream of input changes, if we sort them by time we can walk sequentially through the changes and re-apply logic
whenever there is a change in the input. If the output changes (it may not!) we should emit an output change at the corresponding time.
This is pretty easy, and is what lies at the heart of many stream processors.
What if the times are not totally ordered, meaning that two distinct times t1
and t2
can satisfy at most one of t1 < t2
and t2 < t1
, but they don't have to satisfy either? Two distinct times can just be "meh" with respect to each other.
That's the only change we are going to make. Sound hard?
At its core, the hard part of differential dataflow is taking a collection of input updates at partially ordered times, and producing the corresponding output updates. It is hard. Let's set the stage by walking through some of the hard parts:
With a total order, we just have values, times, and what the change was (addition or deletion). If we want to know what the value is at a given time, we accumulate all of the changes up to that time, and whatever we have is the value at that time.
In fact, this definition will also be true for differential dataflow, though as we talk through it you may have some intuitional panic. I do too; it's natural.
If we have a collection of changes of the form
(val1, time, diff)
then to determine the value at t
, we accumulate up all changes for which time <= t
. This is just like with totally ordered times, except that your mental picture of them being in a line doesn't work any more.
Correspondingly, we need to produce output updates
(val2, time, diff)
so that the output collection always accumulates to the value resulting from logic
applied to the accumulated input collection. Again, it just works, except that if you try and picture it your brain will hurt.
Both input and output updates are accumulated into collections using a time threshold t
. We just add up all the differences for each of the values.
Totally ordered times had a very appealing and intuitive property: the output can only change at those times when the input changes. This is no longer true for partially ordered times. Consider the following two input changes
("carrot", (1, 3), +1)
("turnip", (2, 2), +1)
Each of these records has its count increased by one at two incomparable times: neither (1, 3)
nor (2, 2)
comes before the other. You might say: "this doesn't add up to one value" which is true and indeed I've oversimplified (we actually have collection-valued inputs and outputs, but I wanted to simplify; oops).
Something interesting happens at the time (2, 3)
, which is the first time that includes both updates in its accumulation. This time doesn't appear in the input updates, but it is a time when the input collection changes. This wasn't a problem with totally ordered times, because "the first time that includes both updates" is just the later of the two times. Not here.
There is a theorem that one can prove, which is that a collection can only change at a well-defined set of times: those defined by taking a subset of times at which its input changes and determining the first moment at which all the updates come into effect. This is called the join
of the associated times, and if you are math-y this means when I said "partial order" I really meant "join semi-lattice". This could probably be relaxed, but let's assume it for now.
In the example above where we have pairs of numbers for times, this means we may have changes at any times that are the component-wise maximums of some set of input times. Thinking for a bit, this could be a lot larger than the number of input changes.
These observations give rise to a pretty simple algorithmic framework:
1. Determine all times at which the output collection could change.
2. For each time `t`, in some order not incompatible with the partial order,
a. Accumulate the input value from updates with `time <= t`.
b. Apply `logic` to get an output value.
c. Accumulate and subtract output updates with `time <= t`.
d. Record the difference in outputs as the output at `t`.
The main tricky thing we had to do here was go through the times in some order that wasn't incompatible with the partial order. This is because, to correctly determine the output changes, we need to have all of the "prior" output changes to hand.
That algorithm works, and is roughly how the kernel of differential dataflow behaves. However, there are several under-specified components, and the naive implementations aren't as fast as we want.
We would really like our implementations to be "linear" or nearly so, because our plan is to process unbounded changes to the input collection by repeatedly applying our solution to bounded chunks of input changes, and we'd like to benefit (or at least not suffer) from the flexibility of working on larger batches. If nothing else, it would seem that we could always fall back on execution on smaller chunks.
The first thing we need to do is determine all of the times that can result from joining arbitrary subsets of times. We also need to not take exponential time, because that is probably what you were thinking, weren't you?
There is a fairly simple better algorithm, which maintains a worklist of new times, and repeatedly joins each with the existing times, putting any newly discovered times into the worklist.
let mut index = 0;
while index < times.len() {
for lower in 0 .. index {
let join = times[lower].join(×[index]);
if !times.contains(join) {
times.push(join);
}
}
}
Ok, this is still pretty horrible when we have lots of times. For each element in the list we scan through all prior elements, doing a test that takes linear time (in this implementation). This means we should expect to perform cubic work. If we optimize the contains
test to use a HashMap
, or something similar, this drops down to quadratic time.
This is not yet linear in the number of produced times.
For both the input and the output, we need to accumulate changes prior to various specified t
. We can certainly do this by scanning lists of updates and testing the time
field for each, putting all the updates in either a list that we sort or into a HashMap
like structure.
In either case, the amount of work we would do is proportional to the number of changes multiplied by the number of times we need to process. This will be at least quadratic in the number of changes, because each change contributes at least one time.
This is not yet linear in the number of input and output changes.
I've said we want to try and do work that is linear in the number of inputs and number of outputs. Can we even do that?
I'm not actually sure.
We do know how to do this when the partial order is a total order. In this case, each of the steps amounts to sorting elements by time and applying a sweep through the times that maintains accumulated information, and never returns to prior times. We also believe (or I believe) that we could implement the updates using smaller chunks of changes (e.g. break the input changes into two halves; do one, then do the next).
I'm optimistic that we should be able to do something similar when the partial orders have some structure that make them look a bit like total orders. I've not really nailed this down yet, though. Or rather, I think I've nailed down the "accumulation" part, but not the "determination" part.
Imagine if you will that our collection of partially ordered updates can be partitioned into some number of totally ordered sequences of updates. That is, each sequence is totally ordered (each change happens at a time less than the times of subsequent updates), but there are no promises between the sequences.
I think these are called "chain decompositions" in the order literature.
In differential dataflow, we often have relatively few disjoint chains, because while time runs arbitrarily far ahead, there are only so many iterations that the computation actually encounters. We could form a chain rooted at each of these iterations, with as many rounds as exist in the history of the computation. Although the computation may run forever (-ish), we won't need more chains than the deepest round of iteration ever performed.
Why are chains handy?
-
Accumulation: If we organize our updates into chains, then as we move strictly forward in time the set of changes in each chain we must include only grows. As long as we strictly advance time (non-trivial in a partial order), we only need one pass through each chain, doing a linear amount of work, with a factor of the number of chains.
Instead of taking time equal to the number of updates multiplied by the number of times to process, we take time proportional to the number of chains in the updates multiplied by the number of chains in the times to process. This could be just one when things are simple, and it smoothly degrades as they become less simply.
-
Determination: We would similarly expect that chain decompositions would help us find the closure of a set of times. For sure, if the times are a single chain, totally ordered, then there is no work to do: a chain is already closed under join.
If we have more than one chain, as each are closed we can just take the product of various pairs of chains. We do not need to "re-close" any sets, which is a relief. Unfortunately, "the product of various pairs of chains" is already quadratic, even if the output isn't, and we don't want that.
Unfortunately, that's as far as I am at the moment. It seems like there could be a nice linear time algorithm when the number of chains is small, but I haven't found it quite yet. The problem is a little different, in that unlike the accumulation stage, the determination stage can actually produce quadratic (or more) output. We've allowed ourselves to take time in that case, but the structure of the algorithms appears fundamentally different; one can't always just swing through the chains and ignore the history (example grid
forthcoming).
Here is the simplification that I'm trying to sort out now.
Given two totally ordered sequences A
and B
of elements from a partial order, how efficiently can you determine the set of elements join(a, b)
resulting from joining pairs from the sequences?
Imagine
old: (0, 0), (0, 1), .., (0, i)
new: (0, 0), (1, 0), .., (i, 0)
The correct result
contains a quadratic number of each possible (i, j)
pair. We could justifiably take quadratic time to determine this, because the size of the output is quadratic in i
. It is pretty easy to do in quadratic time, because we just swing through old
and new
and join each pair of elements. Win!
Imagine
old: (0, 0), (0, 1), .., (0, i)
new: (1, 0), (1, 1), .., (1, i)
The correct result
contains a linear number of outputs of the form (1, i)
, just like new
. We would really like to determine this in a linear amount of time. We could tolerate some slight super-linearity, like for sorting and such.
Now, there are probably several algorithms that take linear time on these specific inputs, but imagine that both old
or new
have some random elements dropped out; we actually need the correct answer in this case too.
This is where I am now. If I can sort out how to determine the interesting times efficiently, meaning something linear-ish in the number of produced times, then we should have a differential dataflow implementation that can process arbitrarily long histories at a go. This lies at the heart of giving differential dataflow workers big chunky pieces of work to do, which is how we increase throughput and scaling, without asking the user to batch their changes any differently.
Wouldn't that be great?