Skip to content

Latest commit

 

History

History
625 lines (418 loc) · 46.1 KB

2016-06-21.md

File metadata and controls

625 lines (418 loc) · 46.1 KB

Differential datalog

In this post we will look at a recently re-popularized programming language, Datalog, and how it fits in with differential dataflow. In particular, we will see that

  1. Differential dataflow can easily express Datalog programs, which are mostly just map, filter, and join operators with one iterate wrapped around the whole thing.
  2. The absolute performance of Datalog computations can be as good or even faster using differential dataflow on my laptop as compared to current cluster-based research prototypes from the databases community.
  3. Differential dataflow enables interactive Datalog computations, both in the sense that
    1. the source relations can changed arbitrarily with results automatically corrected, and
    2. top-down queries can be performed without evaluating the entire computation.

Most of this work started up while I was at ETH Zurich's Systems Group, working with John Liagouris and Zaheer Chothia. At the very end, I mention some more recent work with Semih Salihoglu and Khaled Ammar at Waterloo.

Thanks go to John Liagouris who gave some early comments on this post, and supplied measurements for the larger (non-laptop fitting) computations.

Datalog

For those of you not familiar with Datalog, let's start with a brief review. The Datalog wikipedia page also has some good information, and I'm largely going to use their examples because they are nice and clear.

Datalog is based on the idea that you define (i) facts (which you can think of as records), and (ii) rules to produce more facts from your current facts. It is a bit like a logic programming language with a relatively simple set of rules, corresponding to SQL-like selections, projections, and joins.

Facts

Here are some example facts, literally taken from the wikipedia page (so they must be true):

parent(bill, mary)
parent(mary, john)

These are now two facts, read in this case as "bill is a parent of mary" and "mary is a parent of john", unless you want them to be the other way around.

Rules

Here are some example rules, again literally taken from the wikipedia page:

ancestor(X,Y) := parent(X,Y)
ancestor(X,Z) := ancestor(X,Y), parent(Y,Z)

These two rules define new facts: two people are related by ancestry if (i) one is the parent of another, or (ii) one is the ancestor of a parent of the other. Importantly, the rules can be recursive: for example, the second rule uses its own value as part of its definition.

Notice that while the first rule is pretty simple, the second rule involves two relations and crucially the shared variable Y appears in both on the right hand side. This means that we get a new ancestor fact only when tuples in the parent and ancestor relation line up just so, with one of their fields. It is basically a relational equijoin between ancestor and parent, with the caveat that its ancestor input relation is going to grow as the computation runs.

The rules are executed until they have produced all the facts they could possibly produce. For the facts above, we should get

ancestor(bill, mary)
ancestor(mary, john)
ancestor(bill, john)

Strictly speaking, a Datalog program only describes how to determine the ancestor facts, it doesn't require you to compute them until someone asks "what facts are in ancestor?"

Queries

While we could just run that computation above on all of our base facts, until we produce all ancestor facts, we may end up with way more data than we actually want. As a thought experiment, think out how many ancestor(X,Y) tuples we might end up with given at most some number of parent(X,Y) tuples. How many ancestors do you have? Probably "two-to-the-something", right? Maybe more if you count frogs and stuff.

Now multiply that by seven billion.

Instead, Datalog provides support for querying relations. Here is an example query, again from the wikipedia page:

?- ancestor(bill, X)

This query asks: for which values of X is ancestor(bill, X) a fact? This sort of question can in principle be asked and answered without computing the ancestry of entirely unrelated billions-large populations, which is appealing for people who actually want to use these sorts of systems rather than watch them run.

We will learn how to efficiently query Datalog computations in this post, but I think it is fair to say that not a lot of the recent big data systems support this. If any of them.

Differential dataflow

Of course, by now all of you are probably familiar with differential dataflow, but let's review some key concepts for folks who may have just gotten here. There is also an introductory series that might be helpful (part 1, part 2). Differential dataflow does not seem to have a wikipedia page, so I am going to have to make all of this up.

I'll use the running example of Datalog, with the corresponding concepts in differential dataflow called out.

Facts

Differential dataflow works with collections of records. We can think of records as corresponding to facts to help out the connection to Datalog, but there are some important differences. The main one is that Datalog heavily relies on the sane position that once known, facts do not become un-true. In differential dataflow, the contents of a collection can change arbitrarily, and records that used to be in a collection may not be in the collection in the near future. This makes it more powerful, but with great power comes great donuts.

How do we introduce records into a differential dataflow computation? We need to name a new source of records, whose contents we can either supply explicitly (like the Datalog fragment above does with the parent relation), or update interactively as the computation runs.

In the context of a timely dataflow scope, here represented by scope, we would write:

let dataz = vec![ (("bill", "mary"), 1), (("mary", "john"), 1)];
let parents = dataz.to_stream(scope).as_collection();

Let it be known that differential dataflow is not yet optimized for the succinct expression of things.

What we are seeing here is the statement of tuples

(("bill", "mary"), 1)
(("mary", "john"), 1)

which indicate the intended pairs of the parents collection, with their initial multiplicity. Notice that I wrote "collection" rather than "relation": the underlying sets in differential dataflow are multi-sets rather than sets, with the distinction being the former supports multiplicities, which is (i) a strict generalization of sets, and (ii) important to support if you want things to go differential-fast.

If we would rather wait to supply the initial facts for the parents collection, we can also just assemble an interactive input, written as so using timely and differential dataflow:

let (handle, parents) = scope.new_input();
let parents = parents.as_collection();

// ... define dataflow graph

handle.send((("bill", "mary"), 1));
handle.send((("mary", "john"), 1));

This creates a timely (handle, stream) pair, and interprets the stream as a differential dataflow collection. Once we've defined a computation, we can interact with handle to cause records to be added to and removed from the parents collection.

Rules

Differential dataflow supports iterative computation, but unlike Datalog we must explicitly assemble it. Datalog, by comparison, has an implicit loop around everything, and it is the existence of the cycle in the definition of ancestor that causes iteration to happen. Making iteration explicit simplifies many things, including the semantics of programs with negation and non-monotonicity, which are quite simple in differential dataflow and markedly less so in Datalog.

Explicit iteration does not need to be hard to express, and has the advantage that it is relatively unambiguous.

parents.iterate(|ancestors| {
    let parents = parents.enter(&ancestors.scope());
    ancestors.map(|(x,y)| (y,x))
             .join_map(&parents, |&y,&x,&z| (x,z))
             .concat(&ancestors)
             .distinct()
})

This computation, which we can agree involves several more characters (specifically the & character) than the Datalog example above, describes an iterative computation starting from parents, which in each round of iteration joins the current result ancestors with parents, adds in whatever used to be in ancestors, takes the distinct results (these are multisets, remember), and uses that for the value of ancestors in the next iteration.

This is basically the same query as written in Datalog up above, just with lots more symbols and concepts. Why would we do such a thing?

Writing your query as we've done above makes it clear that the final value of ancestors is determined functionally from its input parents. If the collection parents changes for some reason, there is a totally clear understanding of what should happen to ancestors. This will allow us to very efficiently interactively update Datalog-like computations.

The explicit iteration also clarifies what happens if you write a program where records come and go with each iteration: you keep running the iteration until it stops changing, without panicking about the non-monotonicity. The corresponding concept in Datalog is "stratification", which is a way of structuring your possibly self-referential rules to ensure that there is one consistent interpretation. This can be quite a pain, and I personally think it is great just to skip the horror in the first place.

Queries

Differential dataflow doesn't have a notion of "queries"; it just computes the collections you ask it to compute. In the example above, it will determine all records in the ancestors collection and show them to you. Fortunately, differential dataflow's ability to interactively update input collections will allow us to implement queries against the results of Datalog programs. Let's talk through updating input collections, and build towards querying.

Differential dataflow let you supply new records to input collections, and it will determine the consequences and show the results to you.

Recall our example of the (handle, parents) result from creating a new input stream. The handle instance allows us to introduce new changes to the input relation. Rather than use weight 1 like we did to introduce the records, we can use other weights, like:

handle.send((("bill", "mary"), -1));
handle.send((("paul", "mary"),  1));

This causes the parents relation to change, removing bill as a parent of mary and replacing him with paul.

Because differential dataflow is awesome, the collection resulting from the iterate call will be updated with

(("bill", "mary"), -1)
(("bill", "john"), -1)
(("paul", "mary"),  1)
(("paul", "john"),  1)

These are the changes in the ancestors relation that result from the changes to the inputs. Notice that we don't see anything about the tuple ("mary", "john"), because nothing about that tuple has changed. Amazing.

This isn't a query yet, though. We've just screwed around with data, rather than answering something like

?- ancestor(bill, X)

To introduce queries, we'll first need to introduce the idea of "magic sets". Magic sets are rarely well explained, but the intuition is that you can rewrite queries like the above, which only want to see answers involving bill, to a data-driven evaluation starting from the tuple bill and generating only those tuples that might result in bill.

For example, in the ancestors computation we can determine that the first field of the ancestor relation can only result from the first field of the parent relation. Rather than start our ancestors relation with all tuples in the parents collection, we could seed it with those that might possibly produce bill as the ancestor.

This is just a one-line change to our program above:

parents.filter(|x| x.0 == "bill")
       .iterate(|active| {
            let parents = parents.enter(&ancestors.scope());
            ancestors.map(|(x,y)| (y,x))
                     .join_map(&parents, |&y,&x,&z| (x,z))
                     .concat(&ancestors)
                     .distinct()
})

As cool as this is, we don't really want to write and run separate programs for each query we have. How about instead of hard-coding "bill" into our program, we have another input that lets us determine which people to start our queries from?

let (handle, query) = scope.new_input();
let query = query.as_collection();

parents.semijoin(&query)
       .iterate(|active| {
            let parents = parents.enter(&ancestors.scope());
            ancestors.map(|(x,y)| (y,x))
                     .join_map(&parents, |&y,&x,&z| (x,z))
                     .concat(&ancestors)
                     .distinct()
})

We've just added the semijoin(&query) restriction on the parents collection, filtering the initial parent tuples down to those whose first field is present in query (this is what a semijoin does). Now rather than computing all the billions of ancestor relations, we only work through those related to the subjects of query. In fact, we've stumbled across the "reachability" query, which we will revisit when we get to evaluating these computations.

Magic sets are a pretty cool concept, and they generalize quite a bit. I've literally chosen the easiest-to-explain variant of them, and it probably needs a Datalog expert to show up and opine on how broadly useful is the technique I've just described. However, once you do the transformation, differential dataflow's interactive updates allow you to ask queries of Datalog programs with pretty swank latencies.

Evaluation

While it might be easy (ha!) to write these programs, do they actually work well?

In short, yes. Differential dataflow is totally capable of doing things like join, concat, distinct, and iterate, and it does them all with flair and aplomb. It could probably get even faster if anyone other than me was writing the code, but ... it seems like it is already faster than most of the other systems out there. Pow!

Let's grab a very recent paper from UCLA on Big Datalog. So recent, it hasn't been presented yet (SIGMOD 2016 is coming up; you can see the paper presented there!). They do several sorts of Datalog computations in this paper, and compare them to existing systems like SociaLite and Myria. It's a nice paper and you should all give it a read.

However, there is no differential dataflow comparison, so ... let's fix that for them.

Now, to be totally clear I'm going to pick and choose things to evaluate. I am not going to evaluate everything, because some parts of the evaluation are stressing things like "how much RAM do you have", and the answer is that I don't have very much on my laptop, at least relative to the clusters they are using. So, when they go and compute the full transitive closure of some large graphs, I'm going to plead uncle (note: uncle not contained in the ancestor relation).

On the other hand, rather than computing the full transitive closure, we can use magic sets to query elements of the transitive closure, in less time than it takes these systems to even start up. That's kind of cool. I'm mostly interested in talking through the cool things, so that's a good fit for this post.

Transitive closure and same generation

We saw transitive closure up above with the ancestor relation, but let's write it again using the terms of the Big Datalog paper. This is their Program 1:

tc(X,Y) := arc(X,Y)
tc(X,Y) := tc(X,Z), arc(Z,Y)

We will also look at the "same generation" query, which finds people who are the same number of generations from some common ancestor. This is their Program 2:

sg(X,Y) := arc(P,X), arc(P,Y), X != Y
sg(X,Y) := arc(A,X), sg(A,B), arc(B,Y)

The paper has some tree-like graphs they are going to use for evaluation:

name nodes edges #(tc) #(sg)
Tree11 71,391 71,390 805,001 2,086,271,974
Tree17 13,766,856 13,766,855 237,977,708 too big

The size of the transitive closure is mostly related to the depth of the tree times the number of nodes, and the size of the same generation relation is just really large; so large that they didn't figure out what it was for depth 17. Of course, it is just the sum of the squares of the number of nodes at each level of the tree, so they could have figured it out, but maybe it was just too large to print (it was the one thing their system couldn't compute).

We've seen how to write transitive closure in differential dataflow up above, but let's write the "same generation" query in differential dataflow:

let initial = parents.join_map(parents, |P,&X,&Y| (X,Y));
initial.iterate(|same_gen| {
           let parents = parents.enter(&same_gen.scope());
           let initial = initial.enter(&same_gen.scope());
           same_gen.join_map(parents, |A,&B,&X| (B,X))
                   .join_map(parents, |B,&X,&Y| (X,Y))
                   .concat(initial)
                   .distinct()
})

Neat! Not so horrible, but not quite as magically short as up above.

Here are the measurements they have for transitive closure on Tree17 and same generation on Tree11. These are taken from the Big Datalog paper, where they have a cluster of 16 machines each with 4 cores (8 threads).

problem BigDatalog SociaLite Myria Spark
tc (17) 49s > 1 day 91s 244s
sg (11) 53s OOM 822s OOM

So it seems that Big Datalog is good at doing these sorts of queries, at least relative to the existing systems. To try and be fair to the existing systems, SociaLite presents a Datalog interface for iterative graph processing but does not claim to be a full-blown Datalog system, and Spark is clearly just here as a punching bag. Pow!

Let's get a differential dataflow measurement!

Before showing you any numbers I should say that their tree data are randomly generated and I couldn't get trees with the same statistics. Instead, I used deterministic trees where each tree node has three descendants, of a level so that we get at least as many nodes as they got. For me that means

name nodes edges
Tree11 88,573 88,572
Tree17 21,523,360 21,523,359

These graphs are a bit bigger than above, so it shouldn't be much of a cheat unless the weird random structure of the trees leads to weird performance. I hope this isn't the case, because that would be some crazy eval then. Here is differential using one and two worker threads on my laptop, against the cluster numbers:

problem BigDatalog differential
tc (17) 49s 72s / 45s
sg (11) 53s OOM

Ok, differential dataflow went out of memory on the same generation query on my laptop. This actually makes lots of sense, because the limiting aspect of the sg program is that it needs to keep all of the two billion result tuples (8 bytes each) around somehow so that it can suppress duplicates. Actually, to be totally honest I didn't even run the sg query, I just wrote OOM.

The sg results are a crazy expansive way to represent what could be pretty compact information. For example, if the "same generation" relation were an equivalence relation, as is the case for tree-structured input data, then we could just represent the sets of people in each generation, rather than producing the quadratically large set of all pairs of them.

Let's do something that works even when "same generation" isn't an equivalence relation (e.g. sg(A,B) may be due to one ancestor, and sg(B,C) due to a different ancestor): Each ancestor will announce its name and its distance to each of its immediate descendants, who each increment the distance and pass it along to each of their descendants. The computation tracks for each person the distance to each of its ancestors.

We could then produce the sg relation by joining on (ancestor, distance), but we won't do that because it's really, really expensive (and expansive).

parents.map(|(P,C)| (C,(P,1)))
       .iterate(|distances|
           parents.enter(&same_gen.scope());
                  .join_map(&distances, |P,&C,&(A,D)| (C,(A,D+1)))
                  .concat(&distances)
                  .distinct()
)

This computes the distances from each node to each of its ancestors. It is worth noting that this could go on forever if there is a cycle in the graph. Whoa! You can't do that in Datalog, which reassuringly always terminates. But, this also means that you can't implement this program in Datalog either. Oops. Maybe you could implement a similar program, or maybe you have to materialize all quadratically many pairs. Who knows?

Let's update the numbers above, though I should stress that we aren't doing the same sg computation that Big Datalog is doing. We are doing something that I think is smarter, and certainly goes a lot faster even on my laptop. We can even do the sg measurement for the larger Tree17-class graph:

problem BigDatalog differential #(sg)
sg (tree11) 53s 0.26s / 0.16s 1,961,271,939
sg (tree17) DNF 94s / 58s 115,813,751,041,560

Of course, we did this by writing a different program that didn't try to materialize a quadratic expansion of the input. A program you may not be able to write in Datalog, and which these systems may or may not be able to run. Also, a program that might explode if you give it dodgy data, and whose performance may vary wildly if you provide it with non-tree input. The point isn't meant to be that differential is universally faster, but that some degree of flexibility in your programming model can really help.

Graph computation

The Big Datalog paper continues with graph computations: reachability, connectivity, and single-source shortest paths. These are fairly standard graph computations (what, no PageRank?!?), with the property that they can be expressed in Datalog if you don't mind putting some aggregation in at the end (e.g. produce several short paths in Datalog, then take the shortest of them).

I should say, having just done a crap-ton of OSDI reviews, that I'm getting pretty tired of people just slapping down numbers and saying "ours are better; paper please". It is important to understand what is actually going on that makes the numbers better, because therein lies the science.

That being said, I'm about to slap down some numbers and observe "ours are better" without explaining why they should be better.

Here is the deal: the numbers shouldn't be better. Differential dataflow is doing no less than the other systems, and in fact it is doing a fair bit more by leaving the computation in a state where it can be incrementally updated. Perhaps this leaves you in a somewhat dissatisfied state.

I couldn't figure out how they chose the edge lengths for SSSP, so I'm going to assume they used length one for each edge; drop me a line if I'm wrong. All non-differential numbers are from the Big Datalog paper, on a cluster of 16 machines with 4 cores (8 threads) each. The differential numbers are from my laptop (16GB RAM, 1 and 2 cores) for the two smaller datasets (livejournal and orkut, and an Intel Xeon E5-4640 @ 2.40 GHz machine with 32 physical cores and 512G of RAM for the twitter dataset (reporting 1 and 16 cores); my laptop doesn't have quite enough RAM for how differential dataflow represents 1.6B edges.

I don't have numbers for the arabic dataset because I don't have it and need some Java decompression library to open it up, and it's just not working on my laptop. Caveat emptor.

Reachability

graph BigDatalog SociaLite Myria GraphX differential
livejournal 17s 52s 5s 36s 10s / 6s
orkut 20s 67s 6s 48s 16s / 10s
arabic 71s 464s 35s 112s
twitter 125s 755s 102s 3677s 108s / 16s

As we discussed earlier, one can view reachability as the magic set transformation of the transitive closure computation. I thought it would be interesting to also check out how long it takes to compute the reachable set from a vertex once the computation is up and running, as this is probably more representative of the sorts of queries you would want to run. Wait for the "complex data analytics" section; we'll do something like that.

Connectivity

graph BigDatalog SociaLite Myria GraphX differential
livejournal 27s 54s 39s 59s 24s / 14s
orkut 33s 78s 57s 53s 41s / 27s
arabic 213s 467s 485s 174s
twitter 307s OOM 1051s 12041s 763s / 121s

These measurements seem a bit off, personally. When we evaluated Myria on twitter, it took less than 700s to do connectivity on just a single machine. The GraphX paper reports 251s for connectivity on twitter using as many machines each with a bit more memory; the three hour measurement above is probably just measuring how the system pages when it doesn't fit in RAM.

Of course, graph connectivity is something one can do much faster if the goal is actually to compute connected components, rather than have systems shoot-outs. For the twitter dataset, it takes between 15s and 30s to compute undirected connectivity using something like union-find, depending on how you've represented your graph. All of these systems take longer than this (except SociaLite; we have no idea how long it took to run out of memory), with relatively little discussion of why you would use them for this task when 10x faster single-threaded code exists.

For my part, differential dataflow has this neat "automatically incrementalizes" property which might be a good reason to use it rather than single-threaded union-find.

Shortest paths

graph BigDatalog SociaLite Myria GraphX differential
livejournal 53s 172s 70s 36s 12s / 7s
orkut 39s 106s 44s 48s 19s / 10s
arabic 276s 606s 1083s 112s
twitter 260s OOM 1593s 3677s 111s / 18s

The shortest paths computation is basically the same as reachability. In each round, both identify the same sets of reached vertices, just SSSP also tracks a distance with each. Since (as I understand it) the distance is just the round number, new vertices get suppressed the second time they are seen, just like with reachability. It is a bit of a mystery why these systems take so much more time than for reachability, so perhaps there is something I'm missing about how edge lengths were set.

It is possible that these systems are getting stressed out because, unlike with reachability, the payload that gets carried around (distance) has to be treated as some weird non-monotonic property. In differential dataflow it is just data and you do whatever your program says to do with it, but the Datalog folks seem to view this the same way a cat views a cucumber.

Complex data analytics

The paper continues with some more interesting computations: the "people you may know" query and something about multi-level marketing. The first query asks, for a given user what are its top k non-friends ranked by number of mutual friends. Then you have to take the results and look them up in some external database.

These are the numbers the Big Datalog paper reports for the "people you may know" query, where as far as I can tell from their text and program, they are doing the computation for just one user rather than all of them.

graph BigDatalog GraphX+SparkSQL Spark
livejournal 83s 85s 130s
orkut 80s 79s 133s
arabic 225s 250s 241s
twitter 948s 1129s 3298s

I don't have such an external database, and I haven't implemented anything like an antijoin yet (cogroup would probably work, but it's currently broken and I'm lazy). So, I thought I'd take this opportunity to show differential doing something that I think is much cooler, especially as relates to magic sets and querying.

Here is the query, Program 9 from the Big Datalog paper.

uarc(X, Y) := arc(X, Y)
uarc(Y, X) := arc(X, Y)
cnt(Y, Z, count⟨X⟩)) := uarc(X, Y), uarc(X, Z), Y!= Z, ∼uarc(Y, Z)
pymk(X, W9, topk⟨10, Z⟩) := cnt(X, $ID, Z), pages(X, W2, . . . , W9)

There is an identifier $ID baked into the query, in the same way that it is baked into the reachability and shortest paths queries, and you can see that for populating the pymk relation we only really care to evaluate cnt(_, $ID, _). But, if you are doing bottom-up semi-naïve Datalog evaluation, and I suspect these folks are, you are probably going to evaluate pretty much all of the cnt relation. You can see that they sure take a while to do the computation in the numbers up above.

Let's do something smarter. Something "magic".

Imagine that rather than baking $ID into the query, you have an additional relation query which contains identifiers you are interested in. Then you could write a differential dataflow query:

// symmetrize the graph, because they do that too.
let graph = graph.map(|(x,y)| (y,x)).concat(&graph);

graph.semijoin(&query)
     .map(|(x,y)| (y,x))
     .join(&graph)
     .map(|(y,x,z)| (x,z))
     .filter(|&(x,z)| x != z)
     // <-- put antijoin here if you had one
     .topk(10)

Now, differential dataflow doesn't have a topk operator, so I wrote one out longhand (it's three lines, but they aren't pretty enough to share with you).

Elsewhere in the program, I load up the graph and then repeatedly introduce random node identifiers into the query collection, one at a time, and record the amount of time it takes before the topk results come back. For the livejournal graph dataset, it looks something like this:

loaded: Duration { secs: 9, nanos: 825282053 }

latency: Duration { secs: 0, nanos: 75315087 }
latency: Duration { secs: 0, nanos: 160826 }
latency: Duration { secs: 0, nanos: 468201 }
latency: Duration { secs: 0, nanos: 70606 }
latency: Duration { secs: 0, nanos: 182185 }
latency: Duration { secs: 0, nanos: 363024 }
latency: Duration { secs: 0, nanos: 49539 }
latency: Duration { secs: 0, nanos: 59684788 }
latency: Duration { secs: 0, nanos: 112475 }
latency: Duration { secs: 0, nanos: 2287092 }

So, although it takes about 10s to load up the data, we stab out and get all of the friends-of-friends of our target $ID people in times ranging from 49us to 73ms. That's pretty cool. It's also a lot faster than doing a fresh computation with $ID baked into it for each distinct $ID.

We can do the same thing with the orkut graph dataset:

loaded: Duration { secs: 19, nanos: 374305823 }

latency: Duration { secs: 0, nanos: 32116157 }
latency: Duration { secs: 0, nanos: 19210722 }
latency: Duration { secs: 0, nanos: 525633 }
latency: Duration { secs: 0, nanos: 25477877 }
latency: Duration { secs: 0, nanos: 1318900 }
latency: Duration { secs: 0, nanos: 2835391 }
latency: Duration { secs: 0, nanos: 5844649 }
latency: Duration { secs: 0, nanos: 1891151 }
latency: Duration { secs: 0, nanos: 255116 }
latency: Duration { secs: 0, nanos: 4386029 }

It takes a little longer to start up, because the graph is a little larger, and now the times range from 255us to 32ms. Pretty neat, huh?

Let's do the twitter graph next.

FAIL

Yeah, it needs 32GB of RAM for some reason, and my laptop only has 16GB, but whatever. If I had a fancier computer I'd be all over this. You'd be getting friends of friends back in milliseconds, and I'd be strutting. Maybe next hardware refresh...

Instead, numbers on the Xeon courtesy John Liagouris (using one core):

loaded: Duration { secs: 259, nanos: 409523706 }

latency: Duration { secs: 0, nanos: 790721934 }
latency: Duration { secs: 0, nanos: 464077395 }
latency: Duration { secs: 0, nanos: 944770481 }
latency: Duration { secs: 0, nanos: 73499 }
latency: Duration { secs: 0, nanos: 16649 }
latency: Duration { secs: 0, nanos: 14208 }
latency: Duration { secs: 1, nanos: 367248239 }
latency: Duration { secs: 0, nanos: 256860947 }
latency: Duration { secs: 3, nanos: 484397631 }
latency: Duration { secs: 0, nanos: 64377 }
latency: Duration { secs: 4, nanos: 707158442 }

The loading time drops to about 52 seconds using 16 cores. Some of the latencies are small because of the 65 million node identifiers, only 42 million actually have incident edges. All of the "tens of microseconds" results are likely just zero-degree vertices. Other numbers are "hundreds of microseconds", though all stayed (barely) sub-second.

The multi-level marketing had some complicated program and just had a distasteful feel to it, so I didn't do anything there. It does highlight the point that as programs get more complicated, the value of a layer that manages the planning for you increases. This is the point that Datalog experts can show up and shine, by doing lots of clever transformations to the computation rather than me typing things out by hand. I'm happy to concede that point.

Dynamic data

It is worth mentioning that all of the differential dataflow computations above work when the underlying relations change too. I can't be bothered to show you all the times, but for small changes they are all sub-second updates, and often sub-millisecond. It is a lot like the times we saw for making queries in the pymk computation, but you are updating the graphs instead of the things that go and meet the graphs. Whatever, it's fast.

LogicBlox is the only other system I'm aware of that steps up to the "interactively update recursive queries" plate. We did a bake off and differential dataflow ended up ahead, but we also had interactive access to the differential dataflow source code, whereas the LogicBlox implementation is a grown-up product that probably doesn't get hacked on just because some query goes faster when you do.

It's a bit disappointing that so few big data systems are stepping up to the real-time analytics plate. Handing back results to the "people you may know" query, which the distributed systems do in minutes, takes differential dataflow milliseconds. Doesn't that sound like a better way to do things, even just maybe?

Caveats

It is worth asking "what is the trade-off with differential dataflow?" Why has it not overtaken the world?

There are some down sides that I thought I would try and be candid about, in the interest of full disclosure. The two that I can think of are:

  1. Compile times. Differential dataflow is built on Rust, and Rust is compiled down through LLVM. Specifically, it is very aggressively compiled, and the amount of time between writing your program and seeing results the first time can be quite high. Of course, the time to launch the program the next time is trivial, but it is something to factor in if your goal is to make money with infinite data scientists typing on infinite macbooks.

  2. Cognitive overhead. Differential dataflow seems to have relatively simple programs once you write them, but there can be a disconnect between what you want to happen and what it does. It is rather clever on your behalf, and almost 100% of the time you try and be even more clever to help it, you undo the cleverness. This lack of intuitive bridge can make it hard for people exploring to directly write the program they want, because they aren't really sure what is going to happen. Perhaps making the methods ALLCAPS would help.

You've probably got others too, right? Let me know, and I can put them on the list!

For example, maybe "3. author is insufferable".

Mutual recursion

Although not considered in the Big Datalog paper, one of the neat things about Datalog is that it supports mutual recursion: You can have two relations defined in terms of each other:

relation1(X,Y) := relation2(Y,X)
relation2(X,Y) := relation1(X,Y)

This is a pretty dumb example, but you could imagine things being more interesting. For example, we got in to Datalog when John Liagouris wanted to evaluate (read more about this query in our work on explaining outputs):

p(X,Z)   :- p(X,Y), p(Y,Z)
q(X,Y,Z) :- p(X,W), q(W,Y,Z)
q(X,P,W) :- q(X,R,W), r(R,P)
p(X,W)   :- p(X,Y), p(X,Z), c(Y,Z,W)
p(X,W)   :- q(X,R,Y), p(Y,Z), u(R,Z,W)
q(X,Y,Z) :- q(X,R,W), q(W,P,Z), t(R,P,Y)

This has a bit of mutual recursion going on.

One problem with mutual recursion, at least as it relates to our differential dataflow story, is that it is hard to write one iterate block that determines the fixed point of multiple recursively defined collections. The iterate operator just lets you iterate one collection, and you have more than one in mind.

Fortunately, you can get around this pretty easily in differential dataflow. Differential dataflow provides an imperative construct Variable for iterative collections, which lets you write code like:

self.inner.scope().scoped(|subgraph| {
    let variable = Variable::from(self.enter(subgraph));
    let result = logic(&variable);
    variable.set(&result);
    result.leave()
})

This is actually the implementation of iterate, which just creates a new iterative context, declares a variable, and then sets the value of that variable to be some computed value that is allowed to use the variable in its definition. While you are there, check out how short the implementation of iteration is in differential dataflow. Pretty sweet.

You can do exactly the same thing with multiple collections; we even created a version optimized for monotonic collections. The result is something that has a bit more of the look and feel of Datalog; we can write John's query up above using the only slightly horrible:

// IR1: p(x,z) := p(x,y), p(y,z)
let ir1 = p.map(|(x,y)| (y,x))
           .join_map_u(&p, |_y,&x,&z| (x,z));
p.add(&ir1);

// IR2: q(x,r,z) := p(x,y), q(y,r,z)
let ir2 = p.map(|(x,y)| (y,x))
           .join_map_u(&q.map(|(y,r,z)| (y,(r,z))), |_y,&x,&(r,z)| (x,r,z));
q.add(&ir2);

// IR3: p(x,z) := p(y,w), u(w,r,z), q(x,r,y)
let ir3 = p.map(|(y,w)| (w,y))
           .join_map_u(&u.map(|(w,r,z)| (w,(r,z))), |_w,&y,&(r,z)| ((y,r),z))
           .join_map(&q.map(|(x,r,y)| ((y,r),x)), |_yr,&z,&x| (x,z));
p.add(&ir3);

// IR4: p(x,z) := c(y,w,z), p(x,w), p(x,y)
let ir4 = c.map(|(y,w,z)| (w,(y,z)))
           .join_map_u(&p.map(|(x,w)| (w,x)), |_w,&(y,z),&x| ((x,y),z))
           .semijoin(&p)
           .map(|((x,_y),z)| (x,z));
p.add(&ir4);

// IR5: q(x,q,z) := q(x,r,z), s(r,q)
let ir5 = q.map(|(x,r,z)| (r,(x,z)))
           .join_map_u(&s, |_r,&(x,z),&q| (x,q,z));
q.add(&ir5);

// IR6: q(x,e,o) := q(x,y,z), r(y,u,e), q(z,u,o)
let ir6 = q.map(|(x,y,z)| (y,(x,z)))
           .join_map_u(&r.map(|(y,u,e)| (y,(u,e))), |_y,&(x,z),&(u,e)| ((z,u),(x,e)))
           .join_map(&q.map(|(z,u,o)| ((z,u),o)), |_zu,&(x,e),&o| (x,e,o));
q.add(&ir6);

There is obviously still a lot more noise in here than in the Datalog query, but we can take solace in the fact that (i) you could do it automatically if you wanted, and (ii) it goes quite fast.

The code is checked in as src/examples/dataflog.rs, and is relatively readable. :)

Future directions

One of the coolest things about Datalog, and something not really discussed in the Big Datalog paper, is that you get presented with possibly cyclic joins. Cyclic joins are not iterative joins (well, they may be) nor mutually recursive joins (well, they may be), but rather joins involving multiple relations whose constraints fold back on themselves.

For example, the triangles query presented as Program 4 in the Big Datalog paper:

triangles(X, Y, Z) := arc(X, Y), X < Y, arc(Y, Z), Y < Z, arc(Z, X).

This is a very cool program. I'm willing to bet very many donuts that Big Datalog implements this join horribly. They didn't present any measurements, other than that their code is 25x shorter than the corresponding GraphX program. What they don't tell you is that GraphX will probably execute triangle counting 25x more efficiently than they will (something I never thought I would type), because it has a smarter algorithm for doing so. If you naïvely first evaluate the join arc(X, Y), X < Y, arc(Y, Z), which I'm guessing Big Datalog would do, you may already have produced way more tuples than you could possibly produce as output (maybe 25x, though I made that up), something observed a long time ago for triangles and more recently for general cyclic joins.

As far as I am aware, LogicBlox is the only Datalog system with support for worst-case optimal join processing. This means a different execution strategy for processing the join, smarter than just repeatedly doing binary joins. Instead, worst-case optimal join processing typically goes attribute at a time, and can do asymptotically less work than naïve binary join execution, for cyclic queries at least.

There are a few implementations of worst-case optimal join processing, including a high-performance implementation from Stanford and a timely dataflow implementation I wrote up. The issue with Datalog is that the underlying collections that you join evolve; you could re-run the fancy join algorithms each time they do, but this ends up spending relatively large amounts of time processing the data if the changes are small (still gotta do it, though).

Or you could implement the worst-case optimal join algorithms in differential dataflow.

In differential dataflow

Since I know you like Rust macros, I thought I'd show you the GenericJoin algorithm of Ngo et al, written in differential dataflow, in a Rust macro:

macro_rules! join {
    ($source:ident : $help:ident : $(($other:ident, $key:expr, $recons:expr, $index:expr)),*) => {{

        // start each prefix with a large count and meaningless identifier
        let mut counts = $source.map(|(x,w)| ((x, (1 << 30, 1 << 30)), w));

        $( // for each other relation, determine the count of its extensions, then join against the counts so far.
            let temp = counts.map(|(p,ci)| ($key(p).0, ($key(p).1,ci)));
            counts = $other.map(|(p,_e)| (p,()))
                           .group_u(|_k,s,t| t.push(((s.next().unwrap().1, $index),1)))
                           .join_map_u(&temp, |(k,c1,(r,c2))| ($recons(k,r), if c1.0 < c2.0 { c1 } else { c2 }));
        )*

        // we need some help to name an empty collection in the macro
        let mut proposals = $help;
        $(
            // expand appropriate prefixes, add to proposals.
            proposals = counts.filter(|&((_p,(_c,i)),_w)| i == $index)
                              .map(|(p,_)| $key(p))
                              .join_map_u(&$other, |(k,r,e)| ($recons(k,r),e))
                              .concat(&proposals);
        )*

        $(
            // intersect each proposal with all relations
            proposals = proposals.map(|(p,e)| (($key(p).0,e),$key(p).1))
                                 .semijoin(&$other)
                                 .map(|((k,e), r)| ($recons(k,r),e));
        )*

        proposals
    }}
}

Obviously this isn't particularly helpful, but it is basically a differential dataflow translation of the timely dataflow implementation linked above.

I'm not sure what your expectation was, but this actually ends up going faster than naïve binary joins written in differential dataflow, and faster than LogicBlox's worst-case optimal join, when we evaluated it on John's query from up above:

p(X,Z)   :- p(X,Y), p(Y,Z)
q(X,Y,Z) :- p(X,W), q(W,Y,Z)
q(X,P,W) :- q(X,R,W), r(R,P)
p(X,W)   :- p(X,Y), p(X,Z), c(Y,Z,W)
p(X,W)   :- q(X,R,Y), p(Y,Z), u(R,Z,W)
q(X,Y,Z) :- q(X,R,W), q(W,P,Z), t(R,P,Y)

On our data, the fourth rule in there is the real killer, and it is cyclic. If you do the binary join p(X,Y), p(X,Z) first, you produce about two billion tuples, most of which get thrown away when they get checked against c(Y,Z,W). There are better binary join orderings to use, but the Ngo et al algorithm gets to choose between them on a tuple-by-tuple basis, and ends up about 3x faster than the best fixed plan for that fourth rule.

In the future

The issue with the differential dataflow implementation above is that it keeps too much state in memory. It retains the trace of the computation, because that is how differential dataflow works, and that trace can be as large as (or larger than) the output of the computation. It would have been nice if the in-memory state was only as large as the relations we started with.

So, we've done that too (this is the bit with folks from Waterloo). Watch this space.