We are building a pretty interesting thing at Materialize, Inc.
You get an interactive SQL shell (for now: psql
), you load up streaming sources of relational data (think: the changelog coming out of your RDBMS), and you issue interactive queries against the results. Not only do the results come back quickly, but they are maintained, so that the next time you ask the query they are even faster.
This is all great, and we are going to start talking about it pretty soon.
What I've been doing the past few days is hooking up log streams from timely dataflow and differential dataflow, and presenting them internally as continually updating relations.
What does that mean?
Let's look at an example. If you start up vanilla materialized
you can connect to it using psql
, like so:
ameisenigel% psql -h localhost -p 6875 sslmode=disable
psql (11.3, server 0.0.0)
Type "help" for help.
mcsherry=>
That's pretty neat already, right? This would be a great time for me to point out that almost all of what you are about to see is the work of someone other than me, mostly the other intrepid folks at Materialize, Inc.
But let's keep going. We can turn on some timing measurements, like so:
mcsherry=> \timing
Timing is on.
mcsherry=>
We are just about ready to start screwing around. But we don't have any data! This is where I come in.
Timely dataflow and differential dataflow both produce timestamped logs of system events they produce, for example dataflow construction and scheduling in timely dataflow, and state (arrangement) management in differential dataflow. I've just gone and mirrored this data up as continually evolving relations.
For example, let's look at the dataflow operators in the system (some formatting liberties taken to remove UUID guff):
mcsherry=> peek logs_operates;
id | worker | address | name
-----+--------+-----------+------------------------
114 | 0 | [4] | Dataflow
115 | 0 | [4, 1] | UnorderedInput
116 | 0 | [4, 2] | Map
118 | 0 | [4, 3] | Arrange: dual
121 | 0 | [5] | Dataflow
122 | 0 | [5, 1, 1] | ToStream
123 | 0 | [5, 1, 2] | Map
125 | 0 | [5, 1, 3] | Map
127 | 0 | [5, 1, 4] | Arrange: <temp_SNIP>
130 | 0 | [5, 1] | Region
(10 rows)
Time: 6.888 ms
mcsherry=>
There isn't too much to see here. We have two dataflows, each of which are doing some vanilla things (the dual
relation is apparently some vestigial relation that databases are assumed to have, much like your appendix).
You might have noticed that the dataflow addresses seem to start with four, and go up from there. Smart! The first several dataflows are pre-installed to process log records, and .. are not themselves logged because you can imagine what goes wrong there.
Let's look at some other log relations:
mcsherry=> peek logs_channels;
id | worker | scope | source node | source port | target node | target port
-----+--------+--------+-------------+-------------+-------------+-------------
117 | 0 | [4] | 1 | 0 | 2 | 0
119 | 0 | [4] | 2 | 0 | 3 | 0
124 | 0 | [5, 1] | 1 | 0 | 2 | 0
126 | 0 | [5, 1] | 2 | 0 | 3 | 0
128 | 0 | [5, 1] | 3 | 0 | 4 | 0
(5 rows)
Time: 9.052 ms
mcsherry=>
These are the channels connecting the dataflow operators. Each channel inhabits a scope, and connects two operators in that scope. Operators may have multiple input and output ports (where edges attach) so clearly we need to write that down too, even if nothing is especially interesting at the moment.
Let's check out a more interesting bit of logging, from differential dataflow:
mcsherry=> peek logs_arrangement;
operator | worker | records | batches
----------+--------+---------+---------
118 | 0 | 1 | 1
127 | 0 | 1 | 1
(2 rows)
Time: 9.962 ms
mcsherry=>
This relation tells us about the maintained arrangements of data, in this case rather small arrangements for the pre-installed dual
relation (which has a single element, X
). Each arrangement manages some number of logical updates, spread across some other number of physical batches. Ideally the number of batches is logarithmic in the number of records. Seems to check out.
Now, it isn't especially clear from this relation which operators we are talking about. There are some operator identifiers, and we can scroll up to check that they are what I say they are, but there is a better way. Especially if you like typing.
mcsherry=> create view sizes as
select
logs_operates.id,
logs_operates.address,
logs_operates.name,
logs_arrangement.records,
logs_arrangement.batches
from
logs_operates,
logs_arrangement
where
logs_operates.id = logs_arrangement.operator and
logs_operates.worker = logs_arrangement.worker
;
CREATE VIEW
Time: 0.529 ms
mcsherry=>
We just created a "view", which is the database terminology for a relation derived from other relations, probably through a query. At Materialize, Inc. we really like views, because we are pretty good at maintaining them as the input data change. We suspect you are going to be pleased with low-latency access to refreshed views of queries you have previously posed.
In this case, we were just hoping to look at some operator names and their numbers of records and batches, so it's probably not all that complicated.
mcsherry=> peek sizes;
id | address | name | records | batches
-----+------------+----------------------+---------+---------
118 | [4, 3] | Arrange: dual | 1 | 1
127 | [5, 1, 4] | Arrange: <temp_SNIP> | 1 | 1
149 | [6, 1, 7] | Arrange | 29 | 24209
155 | [6, 1, 10] | Arrange | 5 | 2
166 | [6, 1, 15] | Arrange: sizes | 26703 | 7
(5 rows)
Time: 9.349 ms
mcsherry=>
Wait, what? We have our two small relations up there, but .. we also have three new relations and apparently they are crazy.
In fact what we have are the dataflow operators for the newly installed view, sizes
. It's a thing now, and because everything works we actually get to see it also. What's going on in sizes
, you ask? Let's look at our logs_operates
relation again to see which dataflows and operators are installed now.
mcsherry=> peek logs_operates;
id | worker | address | name
-----+--------+------------+---------------------
114 | 0 | [4] | Dataflow
115 | 0 | [4, 1] | UnorderedInput
116 | 0 | [4, 2] | Map
118 | 0 | [4, 3] | Arrange: dual
121 | 0 | [5] | Dataflow
122 | 0 | [5, 1, 1] | ToStream
123 | 0 | [5, 1, 2] | Map
125 | 0 | [5, 1, 3] | Map
127 | 0 | [5, 1, 4] | Arrange: <temp_SNIP>
130 | 0 | [5, 1] | Region
132 | 0 | [6] | Dataflow
133 | 0 | [6, 2] | logs_operates
135 | 0 | [6, 1, 1] | Map
137 | 0 | [6, 1, 2] | AsCollection
139 | 0 | [6, 3] | logs_arrangement
141 | 0 | [6, 1, 3] | Map
143 | 0 | [6, 1, 4] | AsCollection
145 | 0 | [6, 1, 5] | Filter
147 | 0 | [6, 1, 6] | Map
149 | 0 | [6, 1, 7] | Arrange
151 | 0 | [6, 1, 8] | Filter
153 | 0 | [6, 1, 9] | Map
155 | 0 | [6, 1, 10] | Arrange
157 | 0 | [6, 1, 11] | Join
160 | 0 | [6, 1, 12] | Map
162 | 0 | [6, 1, 13] | Map
164 | 0 | [6, 1, 14] | Map
166 | 0 | [6, 1, 15] | Arrange: sizes
169 | 0 | [6, 1] | Region
(29 rows)
Time: 1.502 ms
mcsherry=>
Apparently quite a bit. We'll need to work on tidying that up with time.
There is a fairly substantial new dataflow, operator 132
, corresponding to our view sizes
. The most important operators are the two anonymous Arrange
operators leading in to the Join
operator, fed by logs_operates
and logs_arrangement
. Those are the first two arrangements up above. There is also a third arrangement, Arrange: sizes
, which holds on to the results of the dataflow so that we can quickly re-inspect it.
Quick like so:
mcsherry=> peek sizes;
id | address | name | records | batches
-----+------------+----------------------+---------+---------
118 | [4, 3] | Arrange: dual | 1 | 1
127 | [5, 1, 4] | Arrange: <temp_SNIP> | 1 | 1
149 | [6, 1, 7] | Arrange | 29 | 170025
155 | [6, 1, 10] | Arrange | 11 | 2
166 | [6, 1, 15] | Arrange: sizes | 431775 | 5
(5 rows)
Time: 65.000 ms
mcsherry=>
It's a bit slower than the first time we asked for it. And .. some of those numbers are starting to get worryingly large.
The first large number to talk about is the number of records associated with Arrange: sizes
. It is almost at half a million here, and is climbing. In fact, we know that sizes
has .. well five records up above, right? Where does the larger number come from?
Differential dataflow tracks changes to collections, effectively representing a log of the historical record for a relation. When given permission, it will work to compact that log, drawing down the number of records maintained until it is more in line with the number of records present in the relation.
We haven't given Arrange: sizes
permission to compact its history.
We totally could, but at the moment it isn't something we do automatically, as it means that you lose the ability to perform certain types of queries (e.g. historical queries that dive in to what the relation was a minute ago). Until we give the arrangement permission to compact, it will maintain an increasingly large number of records.
On the plus side, it only maintains five "batches", which are indexed blobs of data (think of them as layers in a log-structured merge tree). This means that when we want some data we just have to search in at most five things, even if the things are quite large.
If you let this thing run for a while, it gets quite a bit larger and slower:
mcsherry=> peek sizes;
id | address | name | records | batches
-----+------------+----------------------+---------+---------
118 | [4, 3] | Arrange: dual | 1 | 1
127 | [5, 1, 4] | Arrange: <temp_SNIP> | 1 | 1
149 | [6, 1, 7] | Arrange | 29 | 817234
155 | [6, 1, 10] | Arrange | 9 | 3
166 | [6, 1, 15] | Arrange: sizes | 8106793 | 12
(5 rows)
Time: 1028.678 ms (00:01.029)
mcsherry=>
Oof. We should probably compact these things at some point, or encourage you not to list the contents of something with eight million updates that compact down to five. Or use more cores or something.
But, this isn't really a bug so much as a missing feature. We are compacting all of the log relations themselves, because they are internal relations rather than user-issued queries, and that works out pretty well:
mcsherry=> peek logs_arrangement;
operator | worker | records | batches
----------+--------+---------+---------
118 | 0 | 1 | 1
127 | 0 | 1 | 1
149 | 0 | 29 | 823879
155 | 0 | 5 | 2
166 | 0 | 8843463 | 10
(5 rows)
Time: 2.472 ms
mcsherry=>
This is just reporting the numbers, rather than actually traversing the data, so it is pretty fast. User traces will get compacted at some point (probably Monday) and get us back to these single-digit millisecond latencies.
The more exciting (for me) large number is harder to explain. Let's look at it again:
mcsherry=> peek sizes;
id | address | name | records | batches
-----+------------+----------------------+---------+---------
118 | [4, 3] | Arrange: dual | 1 | 1
127 | [5, 1, 4] | Arrange: <temp_SNIP> | 1 | 1
149 | [6, 1, 7] | Arrange | 29 | 170025
155 | [6, 1, 10] | Arrange | 11 | 2
166 | [6, 1, 15] | Arrange: sizes | 431775 | 5
(5 rows)
Time: 65.000 ms
mcsherry=>
Operator 149
holds 29 records spread across 170,025 batches. You don't need to be a rocket surgeon to conclude that most of these batches, all but at most 29, are empty.
Why so many batches?
Great question.
Also, why so many batches for operator 149
and not for 155
, which is just the other input to the join
operator?
Operator 149
arranges the data from logs_operates
, which describes the dataflow operators. Unlike operator 155
which arranges logs_arrangement
, the data flowing in to operator 149
stops flowing. The logs_arrangement
stream keeps flowing, because the sizes
dataflow keeps changing, because logs_arrangement
keeps flowing, and ... Right.
But logs_operates
stops changing. At least, until we create a new operator.
mcsherry=> create view temp as select count(*) from logs_text;
CREATE VIEW
Time: 0.442 ms
mcsherry=> peek sizes;
id | address | name | records | batches
-----+------------+----------------------+---------+---------
118 | [4, 3] | Arrange: dual | 1 | 1
127 | [5, 1, 4] | Arrange: <temp_SNIP> | 1 | 1
149 | [6, 1, 7] | Arrange | 57 | 12925
155 | [6, 1, 10] | Arrange | 10 | 1
166 | [6, 1, 15] | Arrange: sizes | 669924 | 10
184 | [7, 1, 6] | Arrange | 0 | 12928
194 | [7, 1, 11] | Arrange | 0 | 12927
201 | [7, 1, 15] | Arrange | 1 | 1
205 | [7, 1, 17] | Arrange | 0 | 1
224 | [7, 1, 25] | Arrange: temp | 1 | 2
(10 rows)
Time: 78.575 ms
We've got some new dataflow stuff in there, but more importantly the number of batches for operator 149
effectively reset. It's clearly going up again, but it dropped down to something much smaller briefly.
The problem is pretty easy to track down at this point. The join
operator in differential dataflow prevents its input arrangements from merging batches until it has processed the batches, to be certain that it can correctly implement a streaming hash join (batches received on either input must be joined against the subset of batches already processed on the other input).
Now, that works great in normal circumstances, but there is a quirk of timely dataflow. Empty batches .. can't be sent. You can only send data in timely dataflow if you hold a "timestamp capability", which come from records themselves. Empty batches are minted in the absence of records, and so do not come with the capabilities that would allow them to be sent on the wire. The join
operator needs to be smarter and consult the progress information timely dataflow provides it.
This turns out to be not all that hard.
I don't want to argue that this should all make sense, but I thought I would show you the code that fixes things. In this case, acknowledged1
is the upper limit of timestamps observed in input batches, which may need to be further advanced by a greater upper limit provided by timely dataflow through the input1.frontier()
.
// We may learn that input frontiers have advanced with empty batches.
acknowledged1
.as_mut()
.map(|acks| {
let mut antichain = timely::progress::frontier::Antichain::new();
for ack in acks.iter() {
for time in input1.frontier().frontier().iter() {
antichain.insert(ack.join(time));
}
}
acks.clear();
acks.extend(antichain.elements().iter().cloned());
});
The gist of what is going on here is that we have a promise that all future times will be greater or equal to some element of acks
, and independently will be greater or equal to some element of input1.frontier()
. This means that every element must be greater or equal to the least upper bound (lub) of some pair of elements. Awkwardly, order theorists decided to call that operation join
. Anyhow, we collect all such pairs, and update acknowledged1
to be that new antichain.
Anti-what?
Pretty much.
The good news is that if you fix the issue in join
and re-build, you get much better behavior out of operator 149
:
mcsherry=> peek sizes;
id | address | name | records | batches
-----+------------+----------------------+---------+---------
118 | [4, 3] | Arrange: dual | 1 | 1
127 | [5, 1, 4] | Arrange: <temp_SNIP> | 1 | 1
149 | [6, 1, 7] | Arrange | 29 | 2
155 | [6, 1, 10] | Arrange | 5 | 1
166 | [6, 1, 15] | Arrange: sizes | 378255 | 9
(5 rows)
Time: 42.425 ms
mcsherry=>
Nice one, right? Still slow because of Arrange: sizes
, but we know why that is. Gonna fix it, too.
It turns out materialized
does a bit more than snoop around in introspected log data, even if that is pretty neat. I picked up the "eat your own dogfood" religion at MSR-SVC, and I wanted logging up and running so that we could use the system itself (or another one, stood up next to it) to understand the ways in which we were doing good or bad jobs.
Up next we might look at diagnosing latency spikes in streamed TPC-C changelogs against a TPC-H analytic workload, cause that is something we do. It's going to turn out that the answer is that Avro is slow, in case you want things spoiled for you.