-
Notifications
You must be signed in to change notification settings - Fork 492
/
Copy pathlayer_map.rs
677 lines (613 loc) · 25.6 KB
/
layer_map.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
//!
//! The layer map tracks what layers exist in a timeline.
//!
//! When the timeline is first accessed, the server lists of all layer files
//! in the timelines/<timeline_id> directory, and populates this map with
//! ImageLayer and DeltaLayer structs corresponding to each file. When the first
//! new WAL record is received, we create an InMemoryLayer to hold the incoming
//! records. Now and then, in the checkpoint() function, the in-memory layer is
//! are frozen, and it is split up into new image and delta layers and the
//! corresponding files are written to disk.
//!
//! Design overview:
//!
//! The `search` method of the layer map is on the read critical path, so we've
//! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
//! Other read methods are less critical but still impact performance of background tasks.
//!
//! This data structure relies on a persistent/immutable binary search tree. See the
//! following lecture for an introduction https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
//! Summary: A persistent/immutable BST (and persistent data structures in general) allows
//! you to modify the tree in such a way that each modification creates a new "version"
//! of the tree. When you modify it, you get a new version, but all previous versions are
//! still accessible too. So if someone is still holding a reference to an older version,
//! they continue to see the tree as it was then. The persistent BST stores all the
//! different versions in an efficient way.
//!
//! Our persistent BST maintains a map of which layer file "covers" each key. It has only
//! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
//! to handle the LSN dimension.
//!
//! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
//! starting from the oldest one. After each insertion, we grab a reference to that "version"
//! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
//! `historic_layer_coverage.rs`.
//!
//! To search for a particular key-LSN pair, you first look up the right "version" in the
//! BTreeMap. Then you search that version of the BST with the key.
//!
//! The persistent BST keeps all the versions, but there is no way to change the old versions
//! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
//! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
//! to throw away most of the persistent BST and build a new one, starting from the oldest
//! LSN. See `LayerMap::flush_updates()`.
//!
mod historic_layer_coverage;
mod layer_coverage;
use crate::keyspace::KeyPartitioning;
use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Result;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
use super::storage_layer::range_eq;
///
/// LayerMap tracks what layers exist on a timeline.
///
pub struct LayerMap<L: ?Sized> {
//
// 'open_layer' holds the current InMemoryLayer that is accepting new
// records. If it is None, 'next_open_layer_at' will be set instead, indicating
// where the start LSN of the next InMemoryLayer that is to be created.
//
pub open_layer: Option<Arc<InMemoryLayer>>,
pub next_open_layer_at: Option<Lsn>,
///
/// Frozen layers, if any. Frozen layers are in-memory layers that
/// are no longer added to, but haven't been written out to disk
/// yet. They contain WAL older than the current 'open_layer' or
/// 'next_open_layer_at', but newer than any historic layer.
/// The frozen layers are in order from oldest to newest, so that
/// the newest one is in the 'back' of the VecDeque, and the oldest
/// in the 'front'.
///
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
/// Index of the historic layers optimized for search
historic: BufferedHistoricLayerCoverage<Arc<L>>,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<L>>,
}
impl<L: ?Sized> Default for LayerMap<L> {
fn default() -> Self {
Self {
open_layer: None,
next_open_layer_at: None,
frozen_layers: VecDeque::default(),
l0_delta_layers: Vec::default(),
historic: BufferedHistoricLayerCoverage::default(),
}
}
}
/// The primary update API for the layer map.
///
/// Batching historic layer insertions and removals is good for
/// performance and this struct helps us do that correctly.
#[must_use]
pub struct BatchedUpdates<'a, L: ?Sized + Layer> {
// While we hold this exclusive reference to the layer map the type checker
// will prevent us from accidentally reading any unflushed updates.
layer_map: &'a mut LayerMap<L>,
}
/// Provide ability to batch more updates while hiding the read
/// API so we don't accidentally read without flushing.
impl<L> BatchedUpdates<'_, L>
where
L: ?Sized + Layer,
{
///
/// Insert an on-disk layer.
///
pub fn insert_historic(&mut self, layer: Arc<L>) {
self.layer_map.insert_historic_noflush(layer)
}
///
/// Remove an on-disk layer from the map.
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer: Arc<L>) {
self.layer_map.remove_historic_noflush(layer)
}
// We will flush on drop anyway, but this method makes it
// more explicit that there is some work being done.
/// Apply all updates
pub fn flush(self) {
// Flush happens on drop
}
}
// Ideally the flush() method should be called explicitly for more
// controlled execution. But if we forget we'd rather flush on drop
// than panic later or read without flushing.
//
// TODO maybe warn if flush hasn't explicitly been called
impl<L> Drop for BatchedUpdates<'_, L>
where
L: ?Sized + Layer,
{
fn drop(&mut self) {
self.layer_map.flush_updates();
}
}
/// Return value of LayerMap::search
pub struct SearchResult<L: ?Sized> {
pub layer: Arc<L>,
pub lsn_floor: Lsn,
}
impl<L> LayerMap<L>
where
L: ?Sized + Layer,
{
///
/// Find the latest layer (by lsn.end) that covers the given
/// 'key', with lsn.start < 'end_lsn'.
///
/// The caller of this function is the page reconstruction
/// algorithm looking for the next relevant delta layer, or
/// the terminal image layer. The caller will pass the lsn_floor
/// value as end_lsn in the next call to search.
///
/// If there's an image layer exactly below the given end_lsn,
/// search should return that layer regardless if there are
/// overlapping deltas.
///
/// If the latest layer is a delta and there is an overlapping
/// image with it below, the lsn_floor returned should be right
/// above that image so we don't skip it in the search. Otherwise
/// the lsn_floor returned should be the bottom of the delta layer
/// because we should make as much progress down the lsn axis
/// as possible. It's fine if this way we skip some overlapping
/// deltas, because the delta we returned would contain the same
/// wal content.
///
/// TODO: This API is convoluted and inefficient. If the caller
/// makes N search calls, we'll end up finding the same latest
/// image layer N times. We should either cache the latest image
/// layer result, or simplify the api to `get_latest_image` and
/// `get_latest_delta`, and only call `get_latest_image` once.
///
/// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers!
///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128());
let latest_image = version.image_coverage.query(key.to_i128());
match (latest_delta, latest_image) {
(None, None) => None,
(None, Some(image)) => {
let lsn_floor = image.get_lsn_range().start;
Some(SearchResult {
layer: image,
lsn_floor,
})
}
(Some(delta), None) => {
let lsn_floor = delta.get_lsn_range().start;
Some(SearchResult {
layer: delta,
lsn_floor,
})
}
(Some(delta), Some(image)) => {
let img_lsn = image.get_lsn_range().start;
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
let image_exact_match = img_lsn + 1 == end_lsn;
if image_is_newer || image_exact_match {
Some(SearchResult {
layer: image,
lsn_floor: img_lsn,
})
} else {
let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
Some(SearchResult {
layer: delta,
lsn_floor,
})
}
}
}
}
/// Start a batch of updates, applied on drop
pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> {
BatchedUpdates { layer_map: self }
}
///
/// Insert an on-disk layer
///
/// Helper function for BatchedUpdates::insert_historic
///
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
self.historic.insert(
historic_layer_coverage::LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image: !layer.is_incremental(),
},
Arc::clone(&layer),
);
if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer);
}
NUM_ONDISK_LAYERS.inc();
}
///
/// Remove an on-disk layer from the map.
///
/// Helper function for BatchedUpdates::remove_historic
///
pub fn remove_historic_noflush(&mut self, layer: Arc<L>) {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
self.historic.remove(historic_layer_coverage::LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image: !layer.is_incremental(),
});
if Self::is_l0(&layer) {
let len_before = self.l0_delta_layers.len();
// FIXME: ptr_eq might fail to return true for 'dyn'
// references. Clippy complains about this. In practice it
// seems to work, the assertion below would be triggered
// otherwise but this ought to be fixed.
#[allow(clippy::vtable_address_comparisons)]
self.l0_delta_layers
.retain(|other| !Arc::ptr_eq(other, &layer));
assert_eq!(self.l0_delta_layers.len(), len_before - 1);
}
NUM_ONDISK_LAYERS.dec();
}
/// Helper function for BatchedUpdates::drop.
pub(self) fn flush_updates(&mut self) {
self.historic.rebuild();
}
/// Is there a newer image layer for given key- and LSN-range? Or a set
/// of image layers within the specified lsn range that cover the entire
/// specified key range?
///
/// This is used for garbage collection, to determine if an old layer can
/// be deleted.
pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> Result<bool> {
if key.is_empty() {
// Vacuously true. There's a newer image for all 0 of the kerys in the range.
return Ok(true);
}
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
Some(v) => v,
None => return Ok(false),
};
let start = key.start.to_i128();
let end = key.end.to_i128();
let layer_covers = |layer: Option<Arc<L>>| match layer {
Some(layer) => layer.get_lsn_range().start >= lsn.start,
None => false,
};
// Check the start is covered
if !layer_covers(version.image_coverage.query(start)) {
return Ok(false);
}
// Check after all changes of coverage
for (_, change_val) in version.image_coverage.range(start..end) {
if !layer_covers(change_val) {
return Ok(false);
}
}
Ok(true)
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
self.historic.iter()
}
///
/// Divide the whole given range of keys into sub-ranges based on the latest
/// image layer that covers each range at the specified lsn (inclusive).
/// This is used when creating new image layers.
///
// FIXME: clippy complains that the result type is very complex. She's probably
// right...
#[allow(clippy::type_complexity)]
pub fn image_coverage(
&self,
key_range: &Range<Key>,
lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> {
let version = match self.historic.get().unwrap().get_version(lsn.0) {
Some(v) => v,
None => return Ok(vec![]),
};
let start = key_range.start.to_i128();
let end = key_range.end.to_i128();
// Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
let mut current_key = start;
let mut current_val = version.image_coverage.query(start);
// Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push((kr, current_val.take()));
current_key = change_key;
current_val = change_val.clone();
}
// Add the final interval
let kr = Key::from_i128(current_key)..Key::from_i128(end);
coverage.push((kr, current_val.take()));
Ok(coverage)
}
pub fn is_l0(layer: &L) -> bool {
range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX))
}
/// This function determines which layers are counted in `count_deltas`:
/// layers that should count towards deciding whether or not to reimage
/// a certain partition range.
///
/// There are two kinds of layers we currently consider reimage-worthy:
///
/// Case 1: Non-L0 layers are currently reimage-worthy by default.
/// TODO Some of these layers are very sparse and cover the entire key
/// range. Replacing 256MB of data (or less!) with terabytes of
/// images doesn't seem wise. We need a better heuristic, possibly
/// based on some of these factors:
/// a) whether this layer has any wal in this partition range
/// b) the size of the layer
/// c) the number of images needed to cover it
/// d) the estimated time until we'll have to reimage over it for GC
///
/// Case 2: Since L0 layers by definition cover the entire key space, we consider
/// them reimage-worthy only when the entire key space can be covered by very few
/// images (currently 1).
/// TODO The optimal number should probably be slightly higher than 1, but to
/// implement that we need to plumb a lot more context into this function
/// than just the current partition_range.
pub fn is_reimage_worthy(layer: &L, partition_range: &Range<Key>) -> bool {
// Case 1
if !Self::is_l0(layer) {
return true;
}
// Case 2
if range_eq(partition_range, &(Key::MIN..Key::MAX)) {
return true;
}
false
}
/// Count the height of the tallest stack of reimage-worthy deltas
/// in this 2d region.
///
/// If `limit` is provided we don't try to count above that number.
///
/// This number is used to compute the largest number of deltas that
/// we'll need to visit for any page reconstruction in this region.
/// We use this heuristic to decide whether to create an image layer.
pub fn count_deltas(
&self,
key: &Range<Key>,
lsn: &Range<Lsn>,
limit: Option<usize>,
) -> Result<usize> {
// We get the delta coverage of the region, and for each part of the coverage
// we recurse right underneath the delta. The recursion depth is limited by
// the largest result this function could return, which is in practice between
// 3 and 10 (since we usually try to create an image when the number gets larger).
if lsn.is_empty() || key.is_empty() || limit == Some(0) {
return Ok(0);
}
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
Some(v) => v,
None => return Ok(0),
};
let start = key.start.to_i128();
let end = key.end.to_i128();
// Initialize loop variables
let mut max_stacked_deltas = 0;
let mut current_key = start;
let mut current_val = version.delta_coverage.query(start);
// Loop through the delta coverage and recurse on each part
for (change_key, change_val) in version.delta_coverage.range(start..end) {
// If there's a relevant delta in this part, add 1 and recurse down
if let Some(val) = current_val {
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath =
self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas = std::cmp::max(
max_stacked_deltas,
base_count + max_stacked_deltas_underneath,
);
}
}
}
current_key = change_key;
current_val = change_val.clone();
}
// Consider the last part
if let Some(val) = current_val {
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(end);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas = std::cmp::max(
max_stacked_deltas,
base_count + max_stacked_deltas_underneath,
);
}
}
}
Ok(max_stacked_deltas)
}
/// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
///
/// The `partition_range` argument is used as context for the reimage-worthiness decision.
///
/// Used as a helper for correctness checks only. Performance not critical.
pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range<Key>) -> usize {
match self.search(key, lsn) {
Some(search_result) => {
if search_result.layer.is_incremental() {
(Self::is_reimage_worthy(&search_result.layer, partition_range) as usize)
+ self.get_difficulty(search_result.lsn_floor, key, partition_range)
} else {
0
}
}
None => 0,
}
}
/// Used for correctness checking. Results are expected to be identical to
/// self.get_difficulty_map. Assumes self.search is correct.
pub fn get_difficulty_map_bruteforce(
&self,
lsn: Lsn,
partitioning: &KeyPartitioning,
) -> Vec<usize> {
// Looking at the difficulty as a function of key, it could only increase
// when a delta layer starts or an image layer ends. Therefore it's sufficient
// to check the difficulties at:
// - the key.start for each non-empty part range
// - the key.start for each delta
// - the key.end for each image
let keys_iter: Box<dyn Iterator<Item = Key>> = {
let mut keys: Vec<Key> = self
.iter_historic_layers()
.map(|layer| {
if layer.is_incremental() {
layer.get_key_range().start
} else {
layer.get_key_range().end
}
})
.collect();
keys.sort();
Box::new(keys.into_iter())
};
let mut keys_iter = keys_iter.peekable();
// Iter the partition and keys together and query all the necessary
// keys, computing the max difficulty for each part.
partitioning
.parts
.iter()
.map(|part| {
let mut difficulty = 0;
// Partition ranges are assumed to be sorted and disjoint
// TODO assert it
for range in &part.ranges {
if !range.is_empty() {
difficulty =
std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range));
}
while let Some(key) = keys_iter.peek() {
if key >= &range.end {
break;
}
let key = keys_iter.next().unwrap();
if key < range.start {
continue;
}
difficulty =
std::cmp::max(difficulty, self.get_difficulty(lsn, key, range));
}
}
difficulty
})
.collect()
}
/// For each part of a keyspace partitioning, return the maximum number of layers
/// that would be needed for page reconstruction in that part at the given LSN.
///
/// If `limit` is provided we don't try to count above that number.
///
/// This method is used to decide where to create new image layers. Computing the
/// result for the entire partitioning at once allows this function to be more
/// efficient, and further optimization is possible by using iterators instead,
/// to allow early return.
///
/// TODO actually use this method instead of count_deltas. Currently we only use
/// it for benchmarks.
pub fn get_difficulty_map(
&self,
lsn: Lsn,
partitioning: &KeyPartitioning,
limit: Option<usize>,
) -> Vec<usize> {
// TODO This is a naive implementation. Perf improvements to do:
// 1. Instead of calling self.image_coverage and self.count_deltas,
// iterate the image and delta coverage only once.
partitioning
.parts
.iter()
.map(|part| {
let mut difficulty = 0;
for range in &part.ranges {
if limit == Some(difficulty) {
break;
}
for (img_range, last_img) in self
.image_coverage(range, lsn)
.expect("why would this err?")
{
if limit == Some(difficulty) {
break;
}
let img_lsn = if let Some(last_img) = last_img {
last_img.get_lsn_range().end
} else {
Lsn(0)
};
if img_lsn < lsn {
let num_deltas = self
.count_deltas(&img_range, &(img_lsn..lsn), limit)
.expect("why would this err lol?");
difficulty = std::cmp::max(difficulty, num_deltas);
}
}
}
difficulty
})
.collect()
}
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> {
Ok(self.l0_delta_layers.clone())
}
/// debugging function to print out the contents of the layer map
#[allow(unused)]
pub fn dump(&self, verbose: bool) -> Result<()> {
println!("Begin dump LayerMap");
println!("open_layer:");
if let Some(open_layer) = &self.open_layer {
open_layer.dump(verbose)?;
}
println!("frozen_layers:");
for frozen_layer in self.frozen_layers.iter() {
frozen_layer.dump(verbose)?;
}
println!("historic_layers:");
for layer in self.iter_historic_layers() {
layer.dump(verbose)?;
}
println!("End dump LayerMap");
Ok(())
}
}