This post results from discussions with Renato Javier Marroquin Mogrovejo, who some months back asked a relative subtle and sophisticated question (paraphrased):
Q: "Can you use differential dataflow's weird timestamps to do multi-temporal analytics?"
--Renato Javier Marroquin Mogrovejo
which had a relatively simpler answer
A: Tries it out. "Sure looks like it!"
--Frank
It turns out the justification for the answer was a bit less simple, and that is what we are going talk through today. If you want to follow along (please don't leap ahead; you will only hurt your brain) we are going to look at experiments/multitemporal.rs
.
This is a great first question, and I'm not sure there is a 100% clear answer that isn't developed through examples. By the end of the post you should have a better understanding, where specifically you'll see how it generalizes concepts like "late data" and "temporal analytics", while also introducing things that may not have clear names (e.g. data integration from domains with decoupled clocks).
If you've been reading this blog for really any amount of time, you may have noticed that I do a bit of work on stream processing systems, in particular timely stream processing. As in, streams of data bearing timestamps, each of which should be thought of as occurring at some moment in time.
In these systems, as in conventional theories of the world around us, time generally moves forward.
Not today.
Today time will move not backward. Which is different, slightly.
This is probably the most common case. Most computations you are familiar with move forward, and this is generally quite a safe direction for time to move. Let's look at a concrete example.
Differential dataflow is this computational framework in which you define a functional computation using dataflow, and then you repeatedly update the input collection and the framework produces the correspondingly updated outputs.
In this framing "repeatedly" and "update" means you change the collection relative to some prior collection. Without trying to get too heavy too fast, let's think about a sequence of updates to the input of a differential dataflow computation.
I'm going to use the example of a reachability computation that counts the number of nodes reachable from node zero along directed input edges, as we change the input edge set. This example isn't meant to be a wildly complicated computation, but it is meant to be non-trivial, so that you won't be inclined to think you can brute-force the mathematics.
Let's imagine we start with a random graph on 10,000 node with 20,000 edges, and we then perform 10 rounds of updates of 100 changes to the set of edges.
time: 0 input changes: 20,000
time: 1 input changes: 200
time: 2 input changes: 200
time: 3 input changes: 200
time: 4 input changes: 200
time: 5 input changes: 200
time: 6 input changes: 200
time: 7 input changes: 200
time: 8 input changes: 200
time: 9 input changes: 200
Here are some updates, or numbers of updates I suppose, in order. Having supplied them, and committed to not changing these times any more, differential dataflow can swing into action and report the number of output changes (in the number of nodes reachable from node zero).
time: 0 changes: 8049
time: 1 changes: -9
time: 2 changes: -39
time: 3 changes: 5
time: 4 changes: 10
time: 5 changes: 11
time: 6 changes: 26
time: 7 changes: -5
time: 8 changes: -2
time: 9 changes: 3
Seems like many of the nodes are initially reachable, and then the number wobbles down and then up and then down and .. Right. It changes. If you want to see more of this sort of thing in action, just about every differential dataflow example program runs an experiment like this, making input changes and reporting output changes.
The specific way you interact with differential dataflow is to hold on to input capabilities, each corresponding to a specific time. As long as you hold a capability, you can continue to change the input at that time (or any future time). If you would like the computation to move forward, you either need to downgrade (advance) or drop input capabilities, which acts as a proof to differential that you will make no further changes.
Obviously you can't do that. Why would you think you could do that?
In differential, like in the world around us, what's done is done. Once you move time forward there is no going back. Differential in particular uses your indelible forward temporal slog to clean up any now-irrelevant details about the past. Once we advance to time 4
, the distinctions between times 0
and 1
are hardly relevant, are they? Differential thinks so, and will happily compact changes at each to reduce its memory footprint.
When time moves forward in differential it is permanent; you can't move time backwards.
Times in differential dataflow don't need to be totally ordered; we can use elements of a partially ordered set, for which two times may simply be incomparable. Neither comes before or after the other.
For example, instead of the integers we can use pairs of integers, (x, y)
where two pairs compare thusly:
(a,b) <= (x,y) iff a <= x and b <= y
Ok, um. neat. Why would we do such a thing, other than mathematical novelty?
Where we previously had times that went 0
, then 1
, then 2
, let's consider instead producing updates at the times
(0,0), (0,1), (0,2)
This is a sequence of times that move forward, in that each is <=
the next. In fact, as we move forward through them, differential dataflow will produce the same outputs as for 0
, 1
, and 2
, just at times that have a zero in front of the numbers.
time: (0, 0) changes: 8049
time: (0, 1) changes: -9
time: (0, 2) changes: -39
time: (0, 3) changes: 5
time: (0, 4) changes: 10
time: (0, 5) changes: 11
time: (0, 6) changes: 26
time: (0, 7) changes: -5
time: (0, 8) changes: -2
time: (0, 9) changes: 3
We have reproduced the same differential dataflow computation as up above, just with more complexity! Nice one. That's great. Really no different. Just every time has another eight bytes of zero.
Except, I suppose, that in principle we could retain a capability at (1,0)
and then make some 100 changes at (1,0)
just ..types.. like ..types.. so ..enter
time: (1, 0) changes: 34
Whoa, crazy. Let's pretend this is so shocking we accidentally downgrade our (1,0)
capability to (2,0)
.
time: (1, 1) changes: -1
time: (1, 4) changes: 1
time: (1, 5) changes: -1
time: (1, 6) changes: 3
time: (1, 8) changes: -4
time: (1, 9) changes: 1
ZOMG ... WHAT ... HAPPENED?!?
Huh.
It's not backwards. It's not really forwards either, it is?
The time (1,0)
comes after (0,0)
but it does not come after (0,1)
nor after (0,2)
. The times (1,1)
and (1,2)
do come after those two, respectively (and the second after the first). More generally, (1,j)
does come after (0,j)
, but not after (0,j+1)
.
We added 100 new edges at (1,0)
and got those changes up above. What do they mean? Seriously, we only added edges and yet have subtractions in numbers of reachable nodes, it seems. Sideways is some weird shit, man.
Let's do it again.
time: (2, 3) changes: 38
time: (2, 6) changes: -2
time: (2, 9) changes: -5
This time I put 100 edges in at (2, 3)
, which creates some output changes at (2,3)
but none before, and then more subtractions. What is up with these subtractions?
time: (3, 1) changes: 20
time: (3, 3) changes: -2
time: (3, 4) changes: -1
time: (3, 6) changes: 1
time: (3, 8) changes: -3
time: (3, 9) changes: 2
That was 100 edges added in at (3,1)
. I'm a bit scared to subtract input edges because I don't even know which ones are present to subtract, and I don't know what it means to do reachability on a graph with negative edges (probably: non-termination).
But, subtractions in the outputs? What does it all mean?
Differential dataflow is rigged to ensure one key property: for any time
, the accumulation of output changes <= time
should equal the computation applied to the accumulation of input changes <= time
. Let's work through some examples.
-
For time
(0,0)
, there is only the initial input "change", that being the input graph itself, and so the output change should be the computation applied to that one input change. -
For time
(0,1)
, there are a few edge changes. The output change at(0,1)
should be whatever it takes to correct the initial output to reflect the correct output on the modified input. -
For times
(0,j)
, the output changes needs to reflect changes to the sum of output changes strictly less than(0,j)
, which should just be the "previous" output at(0,j-1)
.
These were the easy cases, to the extent that any of this is easy. We talked through them up above, back when time was moving forwards.
-
For time
(1,0)
, the world is a bit like at(0,1)
. There are some changes to the intial input, and our output change should reflect them. Importantly, since neither(0,1)
nor(0,2)
nor any of those(0, j)
for largej
are less or equal to(1,0)
we should not include their changes in the input computation, nor include their output changes as the basis for our output corrections. We are just doing a diff off of the computation circa(0,0)
. -
For time
(1,1)
, there were no input changes. Because life isn't fair, that doesn't mean there will be no output changes. In fact, we now have input changes at(0,0)
,(1,0)
, and(0,1)
to accumulate, and there is no reason to believe that when we apply our computation (graph reachability) it will equal the accumulation of the already produced output changes at those three times. We may need to issue some output changes.In particular, like we observed up above, those changes may need to be negative. We may have reported the same positive change in the outputs at both
(0,1)
and(1,0)
, and to correct their sum to be the correct output, which has only one instance of each reached node, we may need to subtract some of them. The record isn't going away, it just doesn't need to be added twice. Said differently, the positive change we saw at(0,1)
no longer needs to happen at(1,1)
, because in this timeline it already happened at(1,0)
. -
For times
(1,j)
, there are issues similar to(1,1)
. I'm sorry. Sometimes we add things, sometimes we subtract them.
Viewed differently, the updates we see at the (1,_)
times are the timeline edits required for the (0,_)
timeline to incorporate the changes that occurred at (1,0)
. The updates we see at the (2,_)
times are the timeline edits required to the (1,_)
timeline to reflect the changes that occurred at (2,3)
.
Viewed still differently, we are looking at a second derivative. For time.
In our first trip up above we went deep on the second coordinate, exploring lots of (0,_)
times before even considering our first (1,_)
time. That's cool, and what we explored was essentially a timestamp that looked like
`(version, history)`
where we used the second history
coordinate to maintain full historical detail about a computation, and then ticked forward through version
making arbitrary changes to the history.
Of course, we could do things differently.
Up above, we never actually dropped our (0, 10)
capability. We didn't have to for differential dataflow to produce all of the outputs we've seen, because all of those output changes are not at times in advance of (0, 10)
, and so differential dataflow can produce the output with confidence.
This means we can still go and apply some changes at (0, 10)
. Maybe 10
is "now" and we want to extend our timeline a bit, updating the different versions we've been tracking.
time: (0, 10) changes: 68
time: (1, 10) changes: -1
time: (3, 10) changes: -16
We just did a streaming update to a multi-temporal query. How's your brain doing now?
One popular form of multi-temporal computation connects "system time" and "event time". In this set-up, a stream processor has its own view of time (perhaps system clocks) that ticks forward at a brisk clip, informing its users that they are seeing updated results. At the same time, the input events may have their own timestamps, and they may not be exactly sychronized with the system time.
Despite receiving late (or early!) data, the stream processor still wants to provide both consistent and prompt updates to its reported timeline.
One way to do this, e.g. in Apache Flink, is to use event time as the system time, where the system reports its changes as the event times move forward, but holding back the production of outputs until all inputs with a certain event time have been reached. Apache Beam has some other approach where you use both and there is some magical timeout and it only works for windows or something and blah blah blah... If any of you all know the details, drop me a note.
But, it is all much simpler as multi-temporal computation. What you want are timestamps
(system, event)
where both coordinates move forward, though their movement is potentially decoupled.
In the example up above, our first round of system time had ten event time updates, and we reported all of them, even though events were "early" in the sense that if the two coordinates of (system, event)
should be comparable, then when we processed (0, 4)
we were speaking about events at a time (4
) in the future of the system (0
).
The system time ticked forward and we had an update at (1,0)
, which would be "late" in that the event time is now behind the system time, and yet we were able to clearly speak about its effect on the timeline (1,_)
, some updates of which are historical revision (those at (1,0)
) and some updates are correcting future speculation (those at (1,x)
for x > 1).
Up above we left open the capability (0,10)
, and there isn't a great reason to do that in this story. It corresponds to retaining the ability to perform far-future event time updates at historical system times, which is really cool, but not something obviously important for the stream processing crowd.
As we run our system time capability forward, repeatedly downgrading to (system, 0)
, we inform differential dataflow that it is able to discard the distinctions between prior system times. This allows it to compact its representation of the differences, and informally this results in something like a "maintained" event timeline rather than full historical detail.
Similarly, we might want to advance our capability not to (system, 0)
, but perhaps something like (system, event - slack)
for some historical slack
beyond which we do not care to accept updates. In prior work, this might be 10s. Here it is whatever you want it to be, driven as you see fit, for computations that may or may not be windowed because why should that be an a mandatory part of your computational model.
Another natural mode is for one stream processor to accept updates from another stream processor, where they each run on decoupled clocks but at least give crisp information about when their times are complete. If a differential computation takes input from another stream processor that indicates completeness of its own times, then we can advance times (system, event)
precisely using our own local system
progress and any event
progress supplied from the source stream processor. They do not need to move forward synchronously.
By moving our capabilities forward, we allow differential dataflow to compact historical differences, and if system
and event
are pretty much the same then we behave almost as if a uni-temporal stream processor. Correctness is guaranteed, and performance is what wobbles if the clocks fall out of sync.
There really are a great many more ways to take advantage of partially ordered times and multitemporal analytics. Briefly:
Your stream processor may accept dated statements about future events. Perhaps you run a hotel and want to know as of various historical dates what rooms are available in your hotel, then triples (system, booking, visit)
may be appropriate to pick apart what is known when (or how to amend assignments if someone cancels a historical event).
Your stream processor may ingest data from different sources with different clocks. Perhaps you use a Twitter feed, real-time flight information, and an hourly updated Spark job. You may not want to block your computation by forcing the three sources into a common event time, in which Spark's latency or Twitter's downtime blocks your analysis. Rather, you might like to proceed ahead and at least be clear about what input data your analysis represents. Timestamps (system, twitter, flights, spark)
would let you do this.
Many databases are starting to look more at "early visibility", in which results of transactions are made visible before they have committed, in the interest of prepping subsequent computation. We can use one temporal coordinate to represent a "open transaction identifier" in that we have early results but these results (and those of subsequent transactions) may still change.
Seriously, lots more. But I'm going to stop typing now.
These are actually the conclusions, because time doesn't go backwards.
Differential dataflow programs natively support time-travelling modifications to entire computations. Apparently they have for several years, but it took Renato asking about them to discover this (related: how rare is discovery in computer science, amirite?).
Some databases support time-traveling queries, and some debugging systems also allow you to "travel" backwards in time by inspecting your history in a time-ordered view. We are talking about something different here, which is forking your timeline and playing it forward again. Not just "what did my computation look like last week?", but "if I added a record one week ago, how would things look now?" and even "how would the entire week have looked?".
This is a bit like live programming, in which your edits to the definition of your computation are immediately (or thereabouts) reflected in the output. We are talking instead about data rather than computation, and at a potentially massive scale and throughput.
I don't know of any time-traveling stream processors. Other than differential dataflow, of course. It arrived in a strange vibrating metallic capsule from the year 2147, and we are only beginning to understand its capabilities.