Skip to content

Commit

Permalink
finish week 1 day 6
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <iskyzh@gmail.com>
  • Loading branch information
skyzh committed Jan 21, 2024
1 parent a2d8b3c commit fa35a7d
Show file tree
Hide file tree
Showing 16 changed files with 376 additions and 22 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we
| 1.3 | Block Format ||||
| 1.4 | Table Format ||||
| 1.5 | Storage Engine - Read Path ||||
| 1.6 | Storage Engine - Write Path || 🚧 | 🚧 |
| 1.6 | Storage Engine - Write Path || | |
| 1.7 | Bloom Filter and Key Compression | | | |
| 2.1 | Compaction Implementation || 🚧 | 🚧 |
| 2.2 | Compaction Strategy - Simple || 🚧 | 🚧 |
Expand Down
2 changes: 1 addition & 1 deletion mini-lsm-book/src/week1-01-memtable.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ fn force_flush_next_imm_memtable(&self) {

This ensures only one thread will be able to modify the LSM state while still allowing concurrent access to the LSM storage.

In this task, you will need to modify `put` and `delete` to respect the soft capacity limit on the memtable. When it reaches the limit, call `force_freeze_memtable` to freeze the memtable. Note that we do not have test cases over this concurrent scenario, and you will need to think about all possible race conditions on your own. Also, remember to check lock regions to ensure the critical regions are the minimum required.
In this task, you will need to modify `put` and `delete` to respect the soft capacity limit on the memtable. When it reaches the limit, call `force_freeze_memtable` to freeze the memtable. Note that we do not have test cases over this concurrent scenario, and you will need to think about all possible race conditions on your own. Also, remember to check lock regions to ensure the critical sections are the minimum required.

You can simply assign the next memtable id as `self.next_sst_id()`. Note that the `imm_memtables` stores the memtables from the latest one to the earliest one. That is to say, `imm_memtables.first()` should be the last frozen memtable.

Expand Down
8 changes: 4 additions & 4 deletions mini-lsm-book/src/week1-05-read-path.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ type LsmIteratorInner =

So that our internal iterator of the LSM storage engine will be an iterator combining both data from the memtables and the SSTs.

Note that our SST iterator does not support passing a end bound to it. Therefore, we will need to handle the `end_bound` manually in `LsmIterator`. You will need to modify your `LsmIterator` logic to stop when the key from the inner iterator reaches the end boundary.
Note that our SST iterator does not support passing an end bound to it. Therefore, you will need to handle the `end_bound` manually in `LsmIterator`. You will need to modify your `LsmIterator` logic to stop when the key from the inner iterator reaches the end boundary.

Our test cases will generate some memtables and SSTs in `l0_sstables`, and you will need to scan all of these data out correctly in this task. You do not need to flush SSTs until next chapter. Therefore, you can go ahead and modify your `LsmStorageInner::scan` interface to create a merge iterator over all memtables and SSTs, so as to finish the read path of your storage engine.

Because `SsTableIterator::create` involves I/O operations and might be slow, we do not want to do this in the `state` critical region. Therefore, you should firstly take read the `state` and clone the `Arc` of the LSM state snapshot. Then, you should drop the lock. After that, you can go through all L0 SSTs and create iterators for each of them, then create a merge iterator to retrieve the data.
Because `SsTableIterator::create` involves I/O operations and might be slow, we do not want to do this in the `state` critical section. Therefore, you should firstly take read the `state` and clone the `Arc` of the LSM state snapshot. Then, you should drop the lock. After that, you can go through all L0 SSTs and create iterators for each of them, then create a merge iterator to retrieve the data.

```rust,no_run
fn scan(&self) {
let snapshot = {
let guard = self.state.read();
Arc::clone(&guard)
}
};
// create iterators and seek them
}
```
Expand All @@ -73,7 +73,7 @@ In this task, you will need to modify:
src/lsm_storage.rs
```

For get requests, it will be processed as lookups in the memtables, and then scans on the SSTs. You can create a merge iterator over all SSTs after probing all memtables. You can seek to the key that the user wants to lookup. There are two possibilities of the seek: the key is the same as what the user probes, and the key is not the same / does not exist. You should only return the value to the user when the key exists and is the same as probed. You should also reduce the critical region of the state lock as in the previous section. Also remember to handle deleted keys.
For get requests, it will be processed as lookups in the memtables, and then scans on the SSTs. You can create a merge iterator over all SSTs after probing all memtables. You can seek to the key that the user wants to lookup. There are two possibilities of the seek: the key is the same as what the user probes, and the key is not the same / does not exist. You should only return the value to the user when the key exists and is the same as probed. You should also reduce the critical section of the state lock as in the previous section. Also remember to handle deleted keys.

## Test Your Understanding

Expand Down
97 changes: 96 additions & 1 deletion mini-lsm-book/src/week1-06-write-path.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,108 @@ cargo x scheck

## Task 1: Flush Memtable to SST

## Task 2: Update the LSM State
At this point, we have all in-memory things and on-disk files ready, and the storage engine is able to read and merge the data from all these structures. Now, we are going to implement the logic to move things from memory to the disk (so-called flush), and complete the Mini-LSM week 1 tutorial.

In this task, you will need to modify:

```
src/lsm_storage.rs
src/mem_table.rs
```

You will need to modify `LSMStorageInner::force_flush_next_imm_memtable` and `MemTable::flush`. In `LSMStorageInner::open`, you will need to create the LSM database directory if it does not exist. To flush a memtable to the disk, we will need to do two things:

* Select a memtable to flush.
* Create an SST file corresponding to a memtable.
* Remove the memtable from the immutable memtable list and add the SST file to L0 SSTs.

We have not explained what is L0 (level-0) SSTs for now. In general, they are the set of SSTs files directly created as a result of memtable flush. In week 1 of this tutorial, we will only have L0 SSTs on the disk. We will dive into how to organize them efficiently using leveled or tiered structure on the disk in week 2.

Note that creating an SST file is a compute-heavy and a costly operation. Again, we do not want to hold the `state` read/write lock for a long time, as it might block other operations and create huge latency spikes in the LSM operations. Also, we use the `state_lock` mutex to serialize state modification operations in the LSM tree. In this task, you will need to think carefully how to use these locks to make the LSM state modification race-condition free while minimizing critical sections.

We do not have concurrent test cases and you will need to think carefully about your implementation. Also, remember that the last memtable in the immutable memtable list is the earliest one, and is the one that you should flush.

<details>

<summary>Spoilers: Flush L0 Pseudo Code</summary>

```rust,no_run
fn flush_l0(&self) {
let _state_lock = self.state_lock.lock();
let memtable_to_flush;
let snapshot = {
let guard = self.state.read();
memtable_to_flush = guard.imm_memtables.last();
};
let sst = memtable_to_flush.flush()?;
{
let guard = self.state.write();
guard.imm_memtables.pop();
guard.l0_sstables.insert(0, sst);
};
}
```

</details>

## Task 2: Flush Trigger

In this task, you will need to modify:

```
src/lsm_storage.rs
src/compact.rs
```

When the number of memtables (immutable + mutable) in memory exceeds the `num_memtable_limit` in LSM storage options, you should flush the earliest memtable to the disk. This is done by a flush thread in the background. The flush thread will be started with the `MiniLSM` structure. We have already implemented necessary code to start the thread and properly stop the thread.

In this task, you will need to implement `LsmStorageInner::trigger_flush` in `compact.rs`, and `MiniLsm::close` in `lsm_storage.rs`. `trigger_flush` will be executed every 50 milliseconds. If the number of memtables exceed the limit, you should call `force_flush_next_imm_memtable` to flush a memtable. When the user calls the `close` function, you should wait until the flush thread (and the compaction thread in week 2) to finish.

## Task 3: Filter the SSTs

Now that you have a fully working storage engine, and you can use the mini-lsm-cli to interact with your storage engine.

```shell
cargo run --bin mini-lsm-cli -- --compaction none
```

And then,

```
fill 1000 3000
get 2333
flush
fill 1000 3000
get 2333
flush
get 2333
scan 2000 2333
```

If you fill more data, you can see your flush thread working and automatically flushing the L0 SSTs without using the `flush` command.

And lastly, let us implement a simple optimization on filtering the SSTs before we end this week. Based on the key range that the user provides, we can easily filter out some SSTs that do not contain the key range, so that we do not need to read them in the merge iterator.

In this task, you will need to modify:

```
src/lsm_storage.rs
src/iterators/*
src/lsm_iterator.rs
```

You will need to change your read path functions to skip the SSTs that is impossible to contain the key/key range. You will need to implement `num_active_iterators` for your iterators so that the test cases can do the check on whether your implementation is correct or not. For `MergeIterator` and `TwoMergeIterator`, it is the sum of `num_active_iterators` of all children iterators. Note that if you did not modify the fields in the starter code of `MergeIterator`, remember to also take `MergeIterator::current` into account. For `LsmIterator` and `FusedIterator`, simply return the number of active iterators from the inner iterator.

You can implement helper functions like `range_overlap` and `key_within` to simplify your code.

## Test Your Understanding

* What happens if a user requests to delete a key twice?
* How much memory (or number of blocks) will be loaded into memory at the same time when the iterator is initialized?

We do not provide reference answers to the questions, and feel free to discuss about them in the Discord community.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
use std::path::PathBuf;
mod wrapper;
use wrapper::mini_lsm_wrapper;

use anyhow::Result;
use bytes::Bytes;
use clap::{Parser, ValueEnum};
use mini_lsm::compact::{
use mini_lsm_wrapper::compact::{
CompactionOptions, LeveledCompactionOptions, SimpleLeveledCompactionOptions,
TieredCompactionOptions,
};
use mini_lsm::iterators::StorageIterator;
use mini_lsm::lsm_storage::{LsmStorageOptions, MiniLsm};
use mini_lsm_wrapper::iterators::StorageIterator;
use mini_lsm_wrapper::lsm_storage::{LsmStorageOptions, MiniLsm};
use std::path::PathBuf;

#[derive(Debug, Clone, ValueEnum)]
enum CompactionStrategy {
Simple,
Leveled,
Tiered,
None,
}

#[derive(Parser, Debug)]
Expand All @@ -37,6 +40,7 @@ fn main() -> Result<()> {
target_sst_size: 2 << 20, // 2MB
num_memtable_limit: 3,
compaction_options: match args.compaction {
CompactionStrategy::None => CompactionOptions::NoCompaction,
CompactionStrategy::Simple => {
CompactionOptions::Simple(SimpleLeveledCompactionOptions {
size_ratio_percent: 200,
Expand Down
6 changes: 6 additions & 0 deletions mini-lsm-starter/src/bin/wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub mod mini_lsm_wrapper {
pub use mini_lsm_starter::*;
}

#[allow(dead_code)]
fn main() {}
23 changes: 23 additions & 0 deletions mini-lsm-starter/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,27 @@ impl LsmStorageInner {
}
Ok(None)
}

fn trigger_flush(&self) -> Result<()> {
Ok(())
}

pub(crate) fn spawn_flush_thread(
self: &Arc<Self>,
rx: crossbeam_channel::Receiver<()>,
) -> Result<Option<std::thread::JoinHandle<()>>> {
let this = self.clone();
let handle = std::thread::spawn(move || {
let ticker = crossbeam_channel::tick(Duration::from_millis(50));
loop {
crossbeam_channel::select! {
recv(ticker) -> _ => if let Err(e) = this.trigger_flush() {
eprintln!("flush failed: {}", e);
},
recv(rx) -> _ => return
}
}
});
return Ok(Some(handle));
}
}
5 changes: 5 additions & 0 deletions mini-lsm-starter/src/iterators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ pub trait StorageIterator {

/// Move to the next position.
fn next(&mut self) -> anyhow::Result<()>;

/// Number of underlying active iterators for this iterator.
fn num_active_iterators(&self) -> usize {
1
}
}
23 changes: 21 additions & 2 deletions mini-lsm-starter/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ impl LsmStorageOptions {
num_memtable_limit: 50,
}
}

pub fn default_for_week1_day6_test() -> Self {
Self {
block_size: 4096,
target_sst_size: 2 << 20,
compaction_options: CompactionOptions::NoCompaction,
enable_wal: false,
num_memtable_limit: 2,
}
}
}

/// The storage interface of the LSM tree.
Expand All @@ -96,6 +106,10 @@ pub(crate) struct LsmStorageInner {
/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
pub struct MiniLsm {
pub(crate) inner: Arc<LsmStorageInner>,
/// Notifies the L0 flush thread to stop working. (In week 1 day 6)
flush_notifier: crossbeam_channel::Sender<()>,
/// The handle for the compaction thread. (In week 1 day 6)
flush_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
/// Notifies the compaction thread to stop working. (In week 2)
compaction_notifier: crossbeam_channel::Sender<()>,
/// The handle for the compaction thread. (In week 2)
Expand All @@ -105,6 +119,7 @@ pub struct MiniLsm {
impl Drop for MiniLsm {
fn drop(&mut self) {
self.compaction_notifier.send(()).ok();
self.flush_notifier.send(()).ok();
}
}

Expand All @@ -117,11 +132,15 @@ impl MiniLsm {
/// not exist.
pub fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Arc<Self>> {
let inner = Arc::new(LsmStorageInner::open(path, options)?);
let (tx, rx) = crossbeam_channel::unbounded();
let (tx1, rx) = crossbeam_channel::unbounded();
let compaction_thread = inner.spawn_compaction_thread(rx)?;
let (tx2, rx) = crossbeam_channel::unbounded();
let flush_thread = inner.spawn_flush_thread(rx)?;
Ok(Arc::new(Self {
inner,
compaction_notifier: tx,
flush_notifier: tx2,
flush_thread: Mutex::new(flush_thread),
compaction_notifier: tx1,
compaction_thread: Mutex::new(compaction_thread),
}))
}
Expand Down
2 changes: 1 addition & 1 deletion mini-lsm-starter/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl MemTable {
unimplemented!()
}

/// Flush the mem-table to SSTable.
/// Flush the mem-table to SSTable. Implement in week 1 day 6.
pub fn flush(&self, _builder: &mut SsTableBuilder) -> Result<()> {
unimplemented!()
}
Expand Down
1 change: 0 additions & 1 deletion mini-lsm-starter/src/storage.rs

This file was deleted.

4 changes: 0 additions & 4 deletions mini-lsm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,3 @@ serde = { version = "1.0", features = ["derive"] }

[dev-dependencies]
tempfile = "3"

[[bin]]
name = "mini-lsm-cli"
path = "src/bin/mini_lsm_cli.rs"
1 change: 1 addition & 0 deletions mini-lsm/src/bin/mini-lsm-cli.rs
6 changes: 6 additions & 0 deletions mini-lsm/src/bin/wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub mod mini_lsm_wrapper {
pub use mini_lsm::*;
}

#[allow(dead_code)]
fn main() {}
5 changes: 2 additions & 3 deletions mini-lsm/src/tests/week1_day5.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::ops::Bound;

use self::harness::generate_sst;
use self::harness::{check_iter_result, MockIterator};
use bytes::Bytes;
use tempfile::tempdir;
use week1_day5::harness::generate_sst;

use self::harness::{check_iter_result, MockIterator};

use super::*;
use crate::{
Expand Down
Loading

0 comments on commit fa35a7d

Please sign in to comment.