Skip to content

Latest commit

 

History

History
604 lines (445 loc) · 32.4 KB

2018-05-06.md

File metadata and controls

604 lines (445 loc) · 32.4 KB

Let's build a query processor!

In this post we'll build a query processor from start to finish, using differential dataflow.

What I mean by that is "a differential dataflow program that can run queries", rather than writing and compiling new differential dataflow programs for each new query you might have thought of. Rather than minutes to compile things (blame Rust!) it should take a millisecond or so to create and install a new computation, followed by probably somewhat more work than in a compiled program.

We are building something between compiled code and an interpreter: a program that builds dataflows in response to program input, but builds the dataflows out of pre-defined operators. It's a bit how a database query execution layer works!

This post will be partly walking through some of the moving parts in differential dataflow, so that you can see them and see how they work, and also partly me having to admit which things are gross and hard to do, so that I can be shamed in to fixing them.

Here is the outline:

  1. Define a little language (borrowed from Graspan).
  2. Defining some Rust types to help us define queries.
  3. Building iterative differential dataflows.
  4. Optimizing differential dataflows.

A little language

Let's start by describing a small query language, which comes courtesy of the Graspan paper. Graspan is a system built to analyze large code bases, by transforming the code bases into directed graphs that describe the flow of data through variable bindings. The Graspan authors built a new system to be able to handle their queries, because apparently SociaLite bombs hard, rarely terminating (much like my browser when I click on SociaLite's "Quick Start" link).

The Graspan setting starts from some base "edges", extracted from a program's source. These edges have a few different semantic flavors, and you could imagine adding new ones if you have further interests. For our purposes they are just pairs of integers labeled with a "flavor" (a string).

Examples include (quoted from their Section 2.2):

  • Dereference edge (D): for each dereference *a, there is a D-edge from a to *a; there is also an edge from an address-of expression &a to a because a is a dereference of &a.

  • Assignment edge (A): for each assignment a = b, there is an A-edge from b to a; a and b can be arbitrary expressions.

We can think of both D and A as sets of edges on some common set of nodes. In this sense, each are a bit like a graph, and one could imagine following paths in this graph.

This is exactly where the Graspan folks go next, defining queries as a collection of rules that follow paths and populate new sets of edges, which can themselves be used in the rules to add even more edges. Furthermore, these rules can even be recursive, which is handy for defining unboundedly long paths.

Roughly, from sets of edges A, B, .. Z, one can define paths by the sequence of edges you need to follow. For example, A B would mean "first take a A edge, then take a B edge". We can make this more crisp by explicitly defining the elements in A B (for any two edge sets):

A B  :=  { (x,z) | exists y s.t. (x,y) in A, (y,z) in B }

Another way to think of A B is by putting lower case variables between the upper case variables: x A y B z.

The Graspan folks make this a bit more interesting by adding some other connectives. They use ? to mean "optionally", they use ~ to mean "reverse edge set" and they use ( .. )* for the Kleene star, indicating zero or more repetitions of whatever is inside the parentheses.

Here are some examples, again from Graspan's Section 2.2:

VF  ::==  (A MA?)*
MA  ::==  ~D VA D
VA  ::==  ~VF MA? VF

In their example, VF is "value flow", MA is "memory aliasing", and VA is "value aliasing". I'm not personally clever enough to confirm whether these rules are smart for program analysis, but they are certainly interesting from a data processing point of view.

Making the language littler

To approach this problem, we'll make the language a little simpler. I'm just going to say where we arrive at, and leave it as a bit of an exercise to confirm that this language is similarly expressive.

Our query language will be a list of productions, where each production has the form:

R0  ::==  R1 R2 .. Rk

There is just one relation named on the left side, and the relations on the right side can be either relations or transposed relations (prefixed with ~). The relations on the right side can occur multiple times, and can include the relation named on the left side (which allows us to define recursion). The same relation can be named in the left side of several distinct productions, and each production contributes to the elements of the left side relation.

This is a bit like the language the Graspan folks get to in their Section 3, but we don't allow empty productions (epsilon, in their framing). Instead, relations R that contain the empty production are replaced by ones R+ that do not, and all productions that use such a relation are replaced by two productions: i. one using the positive relation and ii. one not using it. Ultimately, the property "contains the empty production" is a static property of a relation in a query, and one whose complexity we can compile out.

To reproduce the query up above, we would write

VF+  ::==  A
VF+  ::==  A MA+
VF+  ::==  A VF+
VF+  ::==  A MA+ VF+
MA+  ::==  ~D D
MA+  ::==  ~D VA+ D
VA+  ::==  ~VF+
VA+  ::==  ~VF+ MA+
VA+  ::==  ~VF+ VF+
VA+  ::==  ~VF+ MA+ VF+
VA+  ::==  MA+
VA+  ::==  MA+ VF+
VA+  ::==  VF+

That is a bit wordier, but tough beans. It describes the actual joins we will need to do in the computation.

Also, it's all going to be automated from here out anyhow.

Some helpful Rust types to define queries

Let's create a few Rust types that should help us in our quest.

First, we will just use String to identify relations. This is going to be super easy.

Next, let's create a simple enum to identify relations as transposed or not.

/// A direction we might traverse an edge.
enum Relation {
    /// The forward direction of the relation, x -> y.
    Forward(String),
    /// The reverse direction of the relation, x <- y.
    Reverse(String),
}

Now whenever we talk about a "relation" we can just use the Relation type, which encodes the name of the edge set and also which direction we would like to follow the edges.

With the Relation type in hand, let's describe a type representing a production: one of those rules up there that describe one way we might find edges to add to a relation.

/// A rule for producing edges based on paths.
///
/// The intent is that we should add to the left hand relation any pair `(x,y)` for which
/// there exists a path x -> z0 -> z1 -> .. -> y through the specified relations.
struct Production {
    /// The name of the edge set to populate.
    pub left_hand: String,
    /// A sequence of edges to follow.
    pub relations: Vec<Relation>,
}

Finally, a query will just be a sequence of productions.

struct Query {
    /// A sequence of productions.
    pub productions: Vec<Production>,
}

We will use the convention that empty productions still result in a named relation, but don't actually produce any pairs for the relation. This will let us name and use input relations without doing anything else special.

Building differential dataflows

Alright! Moment of truth. We have a query or two at hand, and we would like to build a differential dataflow that will compute some answers for us.

Our rough dataflow structure will be:

  1. For each named relation define a differential dataflow input, into which we can add (and from which we can remove) edges.
  2. We'll create a new iterative scope in which we can create recursively defined collections.
  3. For each named relation create a differential dataflow Variable, which allows us to define a collection in terms of computations involving itself and other Variables. We set each variable to be initially the collection defined by the associated input.
  4. For each production, repeatedly use join to produce edges that should be added to the target relation.
  5. For each named relation set its definition to be the distinct edges from its initial inputs and productions.

That's all, but there will be some horrible type gunk along the way.

Starting out

Let's try and frame the logic we want to build. We'll add a method to the Query type that should allow us to build a dataflow from the query, in a supplied scope, and returning input handles that allow us to mess around with the relations. Here is what it looks like:

impl Query {

    /// Creates a dataflow implementing the query, and returns input handles.
    pub fn build_in<'a, A, T>(&self, scope: &mut Child<'a, Root<A>, T>) -> HashMap<String, RelationHandles<T>>
    where A : Allocate,
          T : Timestamp+Lattice {
        // ...
    }

Yeah, ok. Ok! It's not gorgeous. The build_in method needs to know about the type of the dataflow scope in which we will build the query, and that type is a mouthful. Unfortunately we can't (currently) go and put differential dataflow inputs wherever we want, and the particular type of scope that supports these inputs is a Child<'a, Root<A>, T> which involves three generic parameters. Barf. I recommend you think of T as usize, and that you don't think of 'a or A at all.

The return type of the method is a HashMap<String, RelationHandles<T>>, which is a map from relation names to a "relation handle", which is a pair of input handle (for changing the input) and trace handle (for viewing the output). I'm not certain you really want to know what these look like, but in case I'm wrong and you do, here they are:

use differential_dataflow::input::InputSession;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::trace::implementations::ord::OrdKeySpine;

type TraceKeyHandle<K,V,T,R> = TraceAgent<K, V, T, R, OrdKeySpine<K, V, T, R>>;

pub struct RelationHandles<T: Timestamp+Lattice> {
    pub input: InputSession<T, Edge, isize>,
    pub trace: TraceKeyHandle<Edge, Product<RootTimestamp, T>, isize>,
}

So yeah. Sorry about all of this. Ideally it will look better when we use it! :D

Defining recursive collections

Before we go too far, I'm going to introduce one last helper type.

We will be using differential dataflow's Variable type to help us build up mutually recursive rules, but it is a bit more flexible than we need. The Variable type allows us to define a collection whose contents change arbitrarily in each iteration, and we aren't going to need to change our collection arbitrarily, but rather just add tuples to the collection if they don't already exist.

Let's build a slightly simpler interface to Variable, an EdgeVariable. This variable can be created from an initial collection, it can have other collections added in using add_production, and by calling complete we finalize its definition.

/// A wrapper for an evolving set of edges.
///
/// This wrapper supports the addition of new productions, and provides access to indexed representations of the forward and reverse
/// edge directions.
pub struct EdgeVariable<'a, G: Scope> where G::Timestamp : Lattice {
    variable: Variable<'a, G, Edge, isize>,
    pub current: Collection<Child<'a, G, u64>, Edge, isize>,
}

impl<'a, G: Scope> EdgeVariable<'a, G> where G::Timestamp : Lattice {
    /// Create a new edge variable from an initial set of values.
    pub fn from(source: &Collection<Child<'a, G, u64>, Edge>) -> Self {
        EdgeVariable {
            variable: Variable::from(source.filter(|_| false)),
            current: source.clone(),
        }
    }
    /// Introduce a new and potentially recursive production of edges.
    pub fn add_production(&mut self, production: &Collection<Child<'a, G, u64>, Edge, isize>) {
        self.current = self.current.concat(production);
    }
    /// Finalizes the variable, connecting its recursive definition.
    ///
    /// Failure to call `complete` on a variable results in a non-recursively defined
    /// collection, whose contents are just its initial `source` data.
    pub fn complete(self) {
        self.variable.set(&self.current.distinct());
    }
}

You may not entirely understand how Variable works internally, and this is fine because differential dataflow is mysterious in its ways, but the short version is that if we create an EdgeVariable and repeatedly call add_production, we will end up with a collection that in each round contains the distinct set of all facts produced by the productions.

The Variable struct is actually a pretty helpful way to set up arbitrary mutually recursive computations, even those involving collections that do not increase monotonically. Maybe we'll talk about it in a later post!

Assembling recursive dataflows

With this taken care of, let's take a peek at what the body of Query::build_in() looks like.

It all starts pretty harmlessly, by creating an nested subscope and preparing some hash maps.

        // Create new input (handle, stream) pairs.
        let mut input_map = HashMap::new();
        for production in self.productions.iter() {
            input_map.entry(production.left_hand.clone())
                     .or_insert_with(|| scope.new_collection());
        }

        // Create nested dataflow scope.
        scope.scoped(|subscope| {

            let mut result_map = HashMap::new();    // from name to RelationHandle.
            let mut variable_map = HashMap::new();  // from name to EdgeVariable.

            // 1. TODO: populate `result_map` and `variable_map`.
            // 2. TODO: add productions based on `self`.

            // complete each variable, return `result_map`.
            for (_name, variable) in variable_map.drain(..) {
                variable.complete();
            }

            result_map
        }

Already we can see that the plan is to hold on to maps to our RelationHandle types and to our EdgeVariable types. The former map is what we are going to return from the method, so that makes sense. The latter map is to scope-local EdgeVariable types and is not something we can return (it contains references to the nested scope's lifetime; many goats were sacrificed to Rust to permit this).

There are two further steps, populating these two maps, and then adding productions based on the query contents.

The first step mostly boils down to draining input_map, which has one entry for each named relation, and building up new EdgeVariable instances inside of subscope. Once we build an edge variable we hit it with leave to pop it out of the loop and then arrange_by_self to grab an instance of it we can export. This second part doesn't have to make sense, but roughly we are extracting a form of the trace that is pure data, with no timely dataflow hooks nor Rust lifetimes in it.

            // Populate `result_map` and `variable_map`.
            for (name, (input, collection)) in input_map.drain() {
                let edge_variable = EdgeVariable::from(&collection.enter(subscope));
                let trace = edge_variable.variable.leave().arrange_by_self().trace;
                result_map.insert(name.clone(), RelationHandles { input, trace });
                variable_map.insert(name.clone(), edge_variable);
            }

Other than the apparently random methods that get called on things, this wasn't all that hard, right? :D

Right? :D

:D

The next step is to use variable_map and self, which is a Query recall, to add productions for each of those variables based on all the relations in each production. We do this production-by-production, doing a bunch of joins and then calling add_production. The only non-obvious thing is that as we form a partial path x, z0, .., zi we want to maintain (x,zi) with fields reversed as (zi,x) with the path destination as the key, because the destination is what we'll use in the join to link up with the next edge (zi,zj). At the end, we'll need to remember to flip the edge around and add (x,y) rather than (y,x).

            // For each rule, add to the productions for the relation.
            for production in self.productions.iter() {

                // We need to start somewhere; ignore empty rules.
                if production.relations.len() > 0 {

                    // We'll track the path transposed, so that it is indexed by *destination* rather than source.
                    let mut transposed = match &production.relations[0] {
                        Relation::Forward(name) => variable_map[name].variable.map(|(x,y)|(y,x)),
                        Relation::Reverse(name) => variable_map[name].variable.clone(),
                    };

                    // Repeatedly join with sequence of relations, transpose.
                    for relation in production.relations[1..].iter() {
                        let to_join = match relation {
                            Relation::Forward(name) => variable_map[name].variable.clone(),
                            Relation::Reverse(name) => variable_map[name].variable.map(|(x,y)|(y,x)),
                        };

                        transposed = transposed.join_map(&to_join, |_k,&x,&y| (y,x));
                    }

                    // Reverse the direction before adding it as a production.
                    variable_map[&production.left_hand]
                        .add_production(&transposed.map(|(dst,src)| (src,dst)));
                }
            }

That's it! Like, that's the whole build_in method. Next let's try using it!

Running it!

Here is a program that reads a Graspan query, then loads some Graspan data, then runs everything. I appear to have left out some of the query parsing code, but you can read all about it in the repository.

fn main() {

    timely::execute_from_args(std::env::args(), move |worker| {

        let timer = ::std::time::Instant::now();

        let peers = worker.peers();
        let index = worker.index();

        let query_filename = std::env::args().nth(1).expect("Argument 1 (query filename) missing.");
        let query_text = std::fs::read_to_string(query_filename).expect("Failed to read query file");
        let query = Query::build_from(query_text.lines());

        let mut relation_map = worker.dataflow::<(),_,_>(|scope| query.render_in(scope));

        if index == 0 { println!("{:?}:\tDataflow assembled for {:?}", timer.elapsed(), query); }

        // Build a dataflow to report final sizes.
        worker.dataflow(|scope| {
            for (name, data) in relation_map.iter_mut() {
                let name = name.to_string();
                data.trace
                    .import(scope)
                    .as_collection(|&_kv,&()| ())
                    .consolidate()
                    .inspect(move |x| println!("{:?}\tfinal size of relation '{}': {:?}", timer.elapsed(), name, x.2));
            }
        });

        // snag a filename to use for the input graph.
        let data_filename = std::env::args().nth(2).expect("Argument 2 (data filename) missing.");
        let file = BufReader::new(File::open(data_filename).expect("Failed to read data file"));

        for readline in file.lines() {
            let line = readline.ok().expect("read error");
            if !line.starts_with('#') && line.len() > 0 {
                let mut elts = line[..].split_whitespace();
                let src = elts.next().expect("data line with no src (1st) element").parse().expect("malformed src");
                if (src as usize) % peers == index {
                    let dst = elts.next().expect("data line with no dst (2nd) element").parse().expect("malformed dst");
                    let val: &str = elts.next().expect("data line with no val (3rd) element");
                    if let Some(handle) = relation_map.get_mut(val) {
                        handle.input.insert((src, dst));
                    }
                    else {
                        panic!("couldn't find the named relation: {:?}", val);
                    }
                }
            }
        }

        if index == 0 { println!("{:?}:\tData loaded", timer.elapsed()); }

    }).expect("Timely computation did not complete cleanly");
}

We can run this with one of the input datasets graciously provided by the Graspan team (thank you, Graspan team!), the nodes (n) and edges (e) relations for httpd. We are going to start with the dataflow query, which looks like

Echidnatron% cat dataflow.query
n n e
e
Echidnatron%

This means that from starting nodes n facts, we follow unboundedly many edges produces new nodes facts. There are also some edge e facts, but there are no rules producing new facts.

Echidnatron% cargo run --release --example graspan -- ./dataflow.query ~/Desktop/graspan/httpd_df.dms
    Finished release [optimized + debuginfo] target(s) in 82.16 secs
     Running `target/release/examples/graspan /Users/mcsherry/Desktop/graspan/httpd_df.dms`
Rendering: Query {
    productions: [
        Production { left_hand: "n", relations: [Forward("n"), Forward("e")] },
        Production { left_hand: "e", relations: [] },
    ]
}
Duration { secs: 0, nanos: 1886966 }:   Dataflow assembled
Duration { secs: 1, nanos: 854102052 }: Data loaded
Duration { secs: 11, nanos: 271388060 } final size of relation 'e': 9905624
Duration { secs: 28, nanos: 778159674 } final size of relation 'n': 9393283
Echidnatron%

We can see a few things here.

  1. Building differential dataflow computations takes a while. It takes 82.16 seconds to build this computation, which isn't itself wildly complicated (the differential dataflow operators are distinct, join, and concat, once each).

    In part, these compile times are why we are doing our computation this way: we would like to have one program that can interpret queries supplied as text, rather than have to rebuild our computation each time we change, with delays in the multiple minutes vs the 1.89ms it takes to build the dataflow above.

  2. The computation reports the number of edges earlier than it reports the number of nodes, by a lot.

    Differential dataflow, like timely dataflow below it, is super smart and can determine that there will be no more changes to the edges relation by virtue of the dataflow graph: the only collections leading in to the definition of edges are the edge inputs themselves, and once all the work is done there it is ready to report.

  3. The computation takes about 30 seconds total, which is a number.

    For comparison, the Graspan paper reports taking 11.4 minutes to perform this query. If you use Datafrog it takes about 3 seconds (after the data loading). So it's in the middle of the pack. Our computation is poised to support arbitrary updates to the input relations, something Datafrog isn't able to deal with.

Optimizing differential dataflows

We have a few structural inefficiencies in our implementation up above, and let's take a moment to iron some of them out. They aren't going to help in the simple example we've been using so far, but they do help in other computations.

Recall that we form our productions like so

    // Repeatedly join with sequence of relations, transpose.
    for relation in production.relations[1..].iter() {
        let to_join = match relation {
            &Relation::Forward(ref name) => variable_map[name].variable.clone(),
            &Relation::Reverse(ref name) => variable_map[name].variable.map(|(x,y)|(y,x)),
        };

        transposed = transposed.join_map(&to_join, |_k,&x,&y| (y,x));
    }

which is to say we repeatedly join whatever collection we have associated with a name. This results in re-indexing these to_join relations each time they are used, whereas we really only want to index them once per way they are used: either indexed by their first field or by their second field.

We can do this with an optimized form of the EdgeVariable struct which maintains indexed forms of the forward and reverse relations:

pub struct EdgeVariableOpt<'a, G: Scope> where G::Timestamp : Lattice {
    variable: Variable<'a, G, Edge, isize>,
    current: Collection<Child<'a, G, u64>, Edge, isize>,
    forward: Option<Arrange<Child<'a, G, u64>, Node, Node, isize>>,
    reverse: Option<Arrange<Child<'a, G, u64>, Node, Node, isize>>,
}

I've made the forward and reverse members be Option<_> types so that they can be None initially, and created only when we actually require them (to avoid building and maintaining indices we do not need):

impl<'a, G: Scope> EdgeVariable<'a, G> where G::Timestamp : Lattice {
    /// The collection arranged in the forward direction.
    pub fn forward(&mut self) -> &Arrange<Child<'a, G, u64>, Node, Node, isize> {
        if self.forward.is_none() {
            self.forward = Some(self.variable.arrange_by_key());
        }
        self.forward.as_ref().unwrap()
    }
    /// The collection arranged in the reverse direction.
    pub fn reverse(&mut self) -> &Arrange<Child<'a, G, u64>, Node, Node, isize> {
        if self.reverse.is_none() {
            self.reverse = Some(self.variable.map(|(x,y)| (y,x)).arrange_by_key());
        }
        self.reverse.as_ref().unwrap()
    }
}

Now when we want to use a relation, we can instead write

    for relation in rule[1..].iter() {
        let to_join = match relation {
            Relation::Forward(name) => variable_map[name].forward(),
            Relation::Reverse(name) => variable_map[name].reverse(),
        };

        transposed = transposed.join_core(to_join, |_k,&x,&y| Some((y,x)));
    }

which uses the lower-level join_core method that allows you to supply pre-indexed arranged data, rather than un-indexed collections. This allows us to re-use a variable multiple times without needing to maintain multiple copies in memory. For example, we could track all paths along five hops of edges:

nodes  ::==  nodes edges edges edges edges edges

without maintaining five indexed copies of edges in memory, as we would have otherwise.

While this may not be very exciting for the query above, this optimization has some advantages for that complicated query we have way back up there:

VF+  ::==  A
VF+  ::==  A MA+
VF+  ::==  A VF+
VF+  ::==  A MA+ VF+
MA+  ::==  ~D D+
MA+  ::==  ~D VA+ D
VA+  ::==  ~VF+
VA+  ::==  ~VF+ MA+
VA+  ::==  ~VF+ VF+
VA+  ::==  ~VF+ MA+ VF+
VA+  ::==  MA+
VA+  ::==  MA+ VF+
VA+  ::==  VF+

Notice that there are only three recursively defined relations, and two ways each might need to be indexed (forward and reverse), but there are way more than six uses of relations on the right-hand side. In particular, we are re-using some relations that get really quite large (almost billions of records), and so saving a few redundant in-memory representations of each (and the compute to maintain them as they change) can be a big deal.

A most excellent bug

Because this is what I spent this morning doing, ... let's try and run with multiple workers!

Echidnatron% cargo run --release --example graspan -- ./dataflow.query ~/Desktop/graspan/httpd_df.dms -w2
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
     Running `target/release/examples/graspan /Users/mcsherry/Desktop/graspan/httpd_df.dms -w2`
Rendering: Query {
    productions: [
        Production { left_hand: "n", relations: [Forward("n"), Forward("e")] },
        Production { left_hand: "e", relations: [] },
    ]
}
Duration { secs: 0, nanos: 4417304 }:   Dataflow assembled
Duration { secs: 1, nanos: 812868872 }: Data loaded

Oh, I bet this is going to go really fast! Super excited here...

Duration { secs: 7, nanos: 833838387 }  edges: 9905624
Operator prematurely shut down: Concatenate(tombstone)
  true
  [ChangeBatch { updates: [(((Root, ()), 2), 1)], clean: 1 }, ChangeBatch { updates: [], clean: 0 }]
thread 'worker thread 1' panicked at 'assertion failed: external_progress.iter_mut().all(|x| x.is_empty())', /Users/mcsherry/Projects/timely-dataflow/src/progress/nested/subgraph.rs:856:13
note: Run with `RUST_BACKTRACE=1` for a backtrace.
thread 'worker thread 0' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', libcore/result.rs:945:5
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Any', libcore/result.rs:945:5
Echidnatron%

Wait. Wut?

Sweet jeebus. Something has gone horribly wrong in the innards of timely dataflow. Some dataflow operator was terminated prematurely, and holy crap how is that possible in user code? Is timely utterly broken? (probably).

It turns out it is a bug in user code. Let me show you:

    for (name, (input, collection)) in input_map.drain() {
        let edge_variable = EdgeVariable::from(&collection.enter(subscope));
        let trace = edge_variable.variable.leave().arrange_by_self().trace;
        result_map.insert(name.clone(), RelationHandles { input, trace });
        variable_map.insert(name.clone(), edge_variable);
    }

Do you see the bug? Let me focus in on the line:

    for (name, (input, collection)) in input_map.drain() {

More clear now? Not really? :)

Timely strongly assumes that workers construct the same dataflow graph, so that operator identifiers can be correlated between workers. In this case, input_map is a HashMap, and the order in which it stores elements depends on a thread-local random number generator. It may not be the same between different workers. So, they might build their inputs in different orders, and "operator zero" might mean something different in one worker than another. When data are exchanged, things may go wrong.

Now, this wouldn't have been a problem if we had different types everywhere, but by the nature of this program all relations are pairs of integers, and so all the types just work out. Oops.

We can fix this with a very silly looking change: we drain and sort the elements, then process them in order.

    let mut input_map = input_map.drain().collect::<Vec<_>>();
    input_map.sort_by(|x,y| x.0.cmp(&y.0));
    for (name, (input, collection)) in input_map.drain(..) {

Ok, that should work great. Let's check it out.

Echidnatron% cargo run --release --example graspan -- ~/Desktop/graspan/httpd_df.dms -w2
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
     Running `target/release/examples/graspan /Users/mcsherry/Desktop/graspan/httpd_df.dms -w2`
Rendering: Query {
    productions: [
        Production { left_hand: "n", relations: [Forward("n"), Forward("e")] },
        Production { left_hand: "e", relations: [] },
    ]
}
Duration { secs: 0, nanos: 2253966 }:   Dataflow assembled
Duration { secs: 1, nanos: 967227291 }: Data loaded

By the way, really excited to have fixed that bug. These new numbers are going to be great!

thread 'worker thread 1' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', libcore/result.rs:945:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.
thread 'worker thread 0' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', libcore/result.rs:945:5
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Any', libcore/result.rs:945:5
Echidnatron%

OH FOR !#$@s SAKE.

It turns out there is another, very similar, bug. This time it is in the calls to complete each variable:

        for (_name, variable) in variable_map.drain() {
            variable.complete();
        }
        result_map
    })

We need to do the same thing again, which is a bit annoying. Instead, here is a Rust crate for an ordered hash map which maintains the insertion order, provides iteration in that order, and (presumably?) drops in the insertion order. I decided to switch to that, possibly permanently.

        for (_name, variable) in variable_map.drain(..) {
            variable.complete();
        }
        result_map
    })

Don't make this same mistake! #BeBest!