Skip to content

Commit

Permalink
refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed May 6, 2024
1 parent f5c14f6 commit 0b881e6
Showing 1 changed file with 28 additions and 32 deletions.
60 changes: 28 additions & 32 deletions posts/2023-12-20.md
Original file line number Diff line number Diff line change
@@ -1,53 +1,49 @@
## Materialize and Memory

Materialize keeps your SQL views up to date as the underlying data change.
To do this, it maintains valuable and relevant intermediate state, and requires random access to this data.
The most natural way to maintain random access to information is using memory, and memory can be expensive.

The value Materialize provides comes from how promptly it reflects new data, but its *cost* comes from the computer resources needed to achieve this.
While we often talk about the value Materialize provides, and work continually to improve it, we are also hard at work continually reducing the cost.
This work has had marked impact recently, and it felt like a great time to tell you about it, and the reductions in cost.

To be clear, Materialize maintains your source data, and much derived data (any materialized view), durably in economical cloud storage.
Materialize maintains your source and derived data (e.g. any materialized view), durably in economical cloud storage.
However, to promptly maintain views and serve results we want to use much more immediately accessible storage.
This storage, memory or as we'll see here local disk, acts as a local cache that must be fast, but needn't be durable.
This storage, memory or as we'll see soon local disk, acts as a cache that must be fast, but needn't be durable.
And of course, we would all like it to be as economical as possible.

We've been dialing down the amount of "overhead" associated with each maintained record in Materialize.
We started some months ago at roughly 96 bytes of overhead (we will explain why), and we are now closing in on between 4 and 16 bytes of overhead, depending.
We've been dialing down the amount of "overhead" associated with each intermediate maintained record in Materialize.
We started some months ago at roughly 96 bytes of overhead (we will explain why), and we are now closing in on between 0 and 16 bytes of overhead, depending.
This first wave of results have already seen many users memory requirements reduced by nearly 2x.
Moreover, we've laid the groundwork for further improvements, through techniques like spill-to-disk and columnar layout and compression.
This work comes at the cost of CPU cycles, but for the moment CPU cycles are abundant (and elastic) in a way that bytes of memory are not.
Moreover, we've laid the groundwork for further improvements, through techniques like spill-to-disk, columnar layout, and compression.
This further work comes at the cost of CPU cycles, but for the moment CPU cycles are abundant (and elastic) in a way that bytes of memory are not.

In this post we'll map out where we started, detail the relatively simple steps we've taken to effectively reduce the overhead, and sketch the future we've opened up with some help from Rust.

### The Fundemantals of Remembered Things

Materialize models all data as relational rows, each of which has some number of columns, each of which contains one of a few different types of data.
Over time the rows come and go, changing their multiplicity through what we call "updates": triples `(data, time, diff)`.
Over time the rows come and go, each changing their multiplicity through what we call "updates": triples `(data, time, diff)`.
Each update indicates a row `data` that at some moment `time` experiences a change `diff` in its multiplicity.
These changes are often `+1` (insertion) or `-1` (deletion), or a mix of two or more (updates).

Materialize wants to maintain *indexed state* by viewing each `data` as a pair `(key, val)`, where `key` are some signified columns and `val` the remaining columns.
Materialize maintains *indexed state* by viewing each `data` as a pair `(key, val)`, where `key` are some signified columns and `val` the remaining columns.
When you create an index on a collection of data, you specify columns by which you hope to access the data; those columns define `key` and `val` for each `data`.
We regularly want to fetch the history of some `key`: the associated `val` and the `(time, diff)` changes they have undergone.
We regularly want to fetch the history of some `key`: the associated `val`s and the `(time, diff)` changes they have undergone.

The abstract data type we use maps from `key` to `val` to a list of `(time, diff)` pairs.
In Rust you might use the `HashMap` type to support this abstraction:
```rust
/// Map from key, to val, to a list of times and differences.
/// Map from key, to value, to a list of times and differences.
type Indexed<K, V, T, D> = HashMap<K, HashMap<V, Vec<(T, D)>>>;
```

For various reasons we won't actually want to use `HashMap` itself, and instead prefer other data structures that provide different performance characteristics.
For example, we are interested in minimizing the number and size of allocations, and optimizing for both random and sequential read and write throughput.
Not to ding Rust's `HashMap` at all, but its flexibility comes at a cost we won't want to pay.

### A First Evolution, circa many years ago

Differential dataflow's fundamental data structures are thusfar based on sorted lists.
You may have thought we were going to impress you with exotic improvements on Rust's `HashMap` implementation, but we are going to start with sorted lists.
All of differential dataflow's historical performance, which has been pretty solid, has been based on [the perhaps surprising efficiency of sorted memory access](https://github.com/frankmcsherry/blog/blob/master/posts/2015-08-15.md).
You may have thought we were going to impress you with exotic improvements on Rust's `HashMap` implementation, but we are going to stay with sorted lists.

In the context of space efficiency, sorted lists have a compelling property that Rust's `HashMap` does not have: you can append multiple sorted lists into one larger list, and only need to record the boundaries between them.
This reduces the per-key, and per-value overhead to something as small as an integer.
Expand All @@ -71,7 +67,7 @@ Each key is present once, in sorted order.
The `usize` offset for each key tells you where to start in the `vals` vector, and you continue until the offset of the next key or the end of the vector.
The `usize` offset for each value tells you where to start in the `upds` vector, and you continue until the offset of the next value or the end of the vector.

The data structure supports high throughput sequential reads and writes, random access reads through binary search on keys, and random access writes through a [log-structure merge-tree](https://en.wikipedia.org/wiki/Log-structured_merge-tree) idiom (perhaps "merge-list" is more appropriate).
The data structure supports high throughput sequential reads and writes, random access reads through binary search on keys, and random access writes through a [log-structure merge-tree](https://en.wikipedia.org/wiki/Log-structured_merge-tree) idiom (although perhaps "merge-list" is more appropriate).

The overhead is one `usize` for each key, and another `usize` for each distinct `(key, val)` pair.
You have three allocations, rather than a number proportional to the number of keys or key-value pairs.
Expand All @@ -83,10 +79,10 @@ Although Materialize maintains only two `usize` (each 8 bytes) beyond the `K`, `

In Materialize both `K` and `V` are `Row` types, which are variable-length byte sequences encoding column data.
In Rust a `Vec<u8>` provides a vector of bytes, and takes 24 bytes in addition to the binary data itself.
In fact, we used a 32 byte version that allowed some amount of in-line allocation, but meant that the minimum sizes of `K` plus `V` is 64 bytes, potentially in addition to the binary row data itself.
In fact we have used a 32 byte version that allows for some amount of in-line allocation, but meant that the minimum sizes of `K` plus `V` is 64 bytes, potentially in addition to the binary row data itself.

Both `T` and `D` are each 8 byte integers, because there are many possible times, and many possible copies of the same record.
Adding these together, we get an accounting of
Adding these together, we get an overhead accounting of
```
key offset: 8 bytes
val offset: 8 bytes
Expand Down Expand Up @@ -115,9 +111,9 @@ As changes happen we continually roll them up into the snapshot, so even a live
The snapshot updates commonly have `(time, diff)` equal to `(now, 1)`.
That is, each `(key, val)` pair in the snapshot exists "right now", and just once.
This provides an opportunity for bespoke compression: if a `(time, diff)` pair repeats we are able to avoid writing it down repeatedly.
In fact, we can sneak this in at zero overhead by taking advantage of a quirk in our `usize` offsets: they *should* always strictly increase to indicate ranges of updates, but we can use a repetition (a non-increase) to indicate that the preceding updates should be reused as well.
In fact, we can sneak this in at zero overhead by taking advantage of a quirk in our `usize` offsets: they *should* always strictly increase to indicate ranges of updates, because empty ranges should not be recorded, but we can use a repetition (a non-increase) to indicate that the preceding updates should be reused as well.

This typically saves 16 bytes per update for the snapshot, and brings us to 80 bytes of overhead.
This typically saves 16 bytes per update for the snapshot, and brings us down to 80 bytes of overhead.
```
key offset: 8 bytes
val offset: 8 bytes
Expand All @@ -130,10 +126,10 @@ overhead: 80 bytes
#### Optimizing `Row` representation

Although we have a 32 byte `Row` we could get by with much less.
Just like we appended lists and stored offsets to track the bounds, we could append lists of bytes and maintain only the `usize` offsets that tell us where each sequence starts and stops.
Just like we appended lists and stored offsets to track the bounds, we could append lists of bytes into one large `Vec<u8>` and maintain only the `usize` offsets that tell us where each sequence starts and stops.

This takes us from 32 bytes with the option for in-line allocation, to 8 bytes without that option.
This applies twice, once each to `key` and `val`.
This applies twice, once to each of `key` and `val`.
Moreover, we avoid an *allocation* for each `key` and `val`, which evades some further unaccounted overhead in and around memory management.
We now have four offsets, two for each of `key` and `val`, which will be important next.
```
Expand Down Expand Up @@ -163,11 +159,11 @@ overhead: 16 bytes

Going even further, these offsets often have very simple structure.
When there is exactly one value for each key (e.g. as in a primary key relationship) the key offsets are exactly the sequence 0, 1, 2, ...
When considering the snapshot, the value offsets are all zero (repetitions indicating the compression).
When considering the snapshot, the value offsets are all zero (recall that repetitions indicate repeated `(time, diff)` pairs).
When the binary slices have the same length (e.g. for fixed-width columns) the corresponding row offsets are the integer multiples of this length.
Each of these cases can be encoded by a single "stride" and a length, using two integers in total rather than any per element.

These further optimizations bring the 16 bytes of overhead closer to 4 bytes, with the potential to dip close to zero when stars align.
These further optimizations can bring the 16 bytes of overhead down, all the way to zero when stars align.

### Further Optimization and Future Work

Expand All @@ -176,15 +172,15 @@ But in fact, there is still opportunity to further reduce cost!

#### Paging Binary Data to Disk

Materialize, by way of differential dataflow, performs its random accesses in a way that [resembles sequential scans](https://github.com/frankmcsherry/blog/blob/master/posts/2015-08-15.md) (essentially: batching and sorting accesses before they happen).
Materialize, by way of differential dataflow, performs its random accesses in a way that [resembles sequential scans](https://github.com/frankmcsherry/blog/blob/master/posts/2015-08-15.md) (essentially: batching and sorting look-ups before they happen).
This means that putting binary payload data on secondary storage like disk is not nearly as problematic as it would be were we to access it randomly, as in a hash map.
Disk is obviously substantially cheaper than memory, and it provides the opportunity to trade away peak responsiveness for some cost reduction.

In fact we've recently done this, backing in-memory row payloads with disk allocations that Linux can spill to if it feels memory pressure.
In fact we've recently done this, backing in-memory allocations with disk allocations that Linux can spill to if it feels memory pressure.
Expect a post in the near future talking about the design and implementation of this paging layer.

Our experience so far is that initial snapshot computation experiences almost no degradation (the batch disk accesses are sequential scans), and once up and running update volumes are often low enough volume that local SSD accesses do not hold up timely results.
The disks are ephemeral caches, and don't come at the same cost as more durable options like cloud block storage.
Our experience so far is that initial snapshot computation experiences almost no degradation (the batch disk accesses are sequential scans), and once up and running update volumes are often low enough volume that local SSD accesses do not prevent timely results.
The local disks are ephemeral caches, and don't come at the same cost as more durable options like cloud block storage.


#### Columnar Compression
Expand All @@ -193,17 +189,17 @@ Rust has some [handy mechanisms](https://blog.rust-lang.org/2022/10/28/gats-stab
Our logic expects each row only as a sequence of `Datum` column values, and doesn't require an actual contiguous `[u8]` binary slab.
This allows us some flexibility in how we record each row, potentially as a `[u8]` sequence, but also potentially re-ordered, transformed, or compressed.

Cloud Data Warehouses often record their data in [columns](https://en.wikipedia.org/wiki/Column-oriented_DBMS), rather than rows, to improve their space efficiency while sacrificing random access.
We don't much care to sacrifice random access entirely, but we can employ several of the same compression tricks.
In particular, we are able to sneak in various compression techniques, ranging from [entropy coding](https://en.wikipedia.org/wiki/Entropy_coding) like Huffman and ANS, to [dictionary coding](https://en.wikipedia.org/wiki/Dictionary_coder) which often works well on denormalized relational data.
Cloud Data Warehouses often record their data in [columns](https://en.wikipedia.org/wiki/Column-oriented_DBMS), rather than rows, to improve their space efficiency while sacrificing performance for random access.
We don't want to sacrifice too much random access, but we can employ several of the same compression tricks.
In particular, we are able to sneak in various techniques, from [entropy coding](https://en.wikipedia.org/wiki/Entropy_coding) like Huffman and ANS, to [dictionary coding](https://en.wikipedia.org/wiki/Dictionary_coder) which often works well on denormalized relational data.
Moreover, we can apply these techniques column-by-column, as columns often exhibit more commonality than do rows.

The benefits of compression depend greatly on the nature of the data itself, and come at a non-trivial CPU overhead, but would unlock significant space savings and further opportunities.

#### Query Optimization

A final, evergreen opportunity is to continue to reduce the amount of information we need to maintain, independent of how it is represented.
Materialize's optimizer pays specific attention to the amount of information maintained, which distinguishes it from most query optimizer that aim to reduce query time.
Materialize's optimizer pays specific attention to the amount of information maintained, which distinguishes it from most query optimizers that aim primarily to reduce query time.
How and where we maintain state is very much under our control, and something we still have many opportunities to improve.

### Wrapping Up
Expand Down

0 comments on commit 0b881e6

Please sign in to comment.