From fa35a7dc9eaf86396f79ade1da1066009184d987 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Sun, 21 Jan 2024 17:40:47 +0800 Subject: [PATCH] finish week 1 day 6 Signed-off-by: Alex Chi Z --- README.md | 2 +- mini-lsm-book/src/week1-01-memtable.md | 2 +- mini-lsm-book/src/week1-05-read-path.md | 8 +- mini-lsm-book/src/week1-06-write-path.md | 97 ++++++++- .../src/bin/mini-lsm-cli.rs | 12 +- mini-lsm-starter/src/bin/wrapper.rs | 6 + mini-lsm-starter/src/compact.rs | 23 ++ mini-lsm-starter/src/iterators.rs | 5 + mini-lsm-starter/src/lsm_storage.rs | 23 +- mini-lsm-starter/src/mem_table.rs | 2 +- mini-lsm-starter/src/storage.rs | 1 - mini-lsm/Cargo.toml | 4 - mini-lsm/src/bin/mini-lsm-cli.rs | 1 + mini-lsm/src/bin/wrapper.rs | 6 + mini-lsm/src/tests/week1_day5.rs | 5 +- mini-lsm/src/tests/week1_day6.rs | 201 ++++++++++++++++++ 16 files changed, 376 insertions(+), 22 deletions(-) rename mini-lsm/src/bin/mini_lsm_cli.rs => mini-lsm-starter/src/bin/mini-lsm-cli.rs (93%) create mode 100644 mini-lsm-starter/src/bin/wrapper.rs delete mode 100644 mini-lsm-starter/src/storage.rs create mode 120000 mini-lsm/src/bin/mini-lsm-cli.rs create mode 100644 mini-lsm/src/bin/wrapper.rs create mode 100644 mini-lsm/src/tests/week1_day6.rs diff --git a/README.md b/README.md index b6749311..10419c74 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we | 1.3 | Block Format | ✅ | ✅ | ✅ | | 1.4 | Table Format | ✅ | ✅ | ✅ | | 1.5 | Storage Engine - Read Path | ✅ | ✅ | ✅ | -| 1.6 | Storage Engine - Write Path | ✅ | 🚧 | 🚧 | +| 1.6 | Storage Engine - Write Path | ✅ | ✅ | ✅ | | 1.7 | Bloom Filter and Key Compression | | | | | 2.1 | Compaction Implementation | ✅ | 🚧 | 🚧 | | 2.2 | Compaction Strategy - Simple | ✅ | 🚧 | 🚧 | diff --git a/mini-lsm-book/src/week1-01-memtable.md b/mini-lsm-book/src/week1-01-memtable.md index 32f6c7de..c24be38b 100644 --- a/mini-lsm-book/src/week1-01-memtable.md +++ b/mini-lsm-book/src/week1-01-memtable.md @@ -137,7 +137,7 @@ fn force_flush_next_imm_memtable(&self) { This ensures only one thread will be able to modify the LSM state while still allowing concurrent access to the LSM storage. -In this task, you will need to modify `put` and `delete` to respect the soft capacity limit on the memtable. When it reaches the limit, call `force_freeze_memtable` to freeze the memtable. Note that we do not have test cases over this concurrent scenario, and you will need to think about all possible race conditions on your own. Also, remember to check lock regions to ensure the critical regions are the minimum required. +In this task, you will need to modify `put` and `delete` to respect the soft capacity limit on the memtable. When it reaches the limit, call `force_freeze_memtable` to freeze the memtable. Note that we do not have test cases over this concurrent scenario, and you will need to think about all possible race conditions on your own. Also, remember to check lock regions to ensure the critical sections are the minimum required. You can simply assign the next memtable id as `self.next_sst_id()`. Note that the `imm_memtables` stores the memtables from the latest one to the earliest one. That is to say, `imm_memtables.first()` should be the last frozen memtable. diff --git a/mini-lsm-book/src/week1-05-read-path.md b/mini-lsm-book/src/week1-05-read-path.md index dcc9669a..47c6100b 100644 --- a/mini-lsm-book/src/week1-05-read-path.md +++ b/mini-lsm-book/src/week1-05-read-path.md @@ -47,18 +47,18 @@ type LsmIteratorInner = So that our internal iterator of the LSM storage engine will be an iterator combining both data from the memtables and the SSTs. -Note that our SST iterator does not support passing a end bound to it. Therefore, we will need to handle the `end_bound` manually in `LsmIterator`. You will need to modify your `LsmIterator` logic to stop when the key from the inner iterator reaches the end boundary. +Note that our SST iterator does not support passing an end bound to it. Therefore, you will need to handle the `end_bound` manually in `LsmIterator`. You will need to modify your `LsmIterator` logic to stop when the key from the inner iterator reaches the end boundary. Our test cases will generate some memtables and SSTs in `l0_sstables`, and you will need to scan all of these data out correctly in this task. You do not need to flush SSTs until next chapter. Therefore, you can go ahead and modify your `LsmStorageInner::scan` interface to create a merge iterator over all memtables and SSTs, so as to finish the read path of your storage engine. -Because `SsTableIterator::create` involves I/O operations and might be slow, we do not want to do this in the `state` critical region. Therefore, you should firstly take read the `state` and clone the `Arc` of the LSM state snapshot. Then, you should drop the lock. After that, you can go through all L0 SSTs and create iterators for each of them, then create a merge iterator to retrieve the data. +Because `SsTableIterator::create` involves I/O operations and might be slow, we do not want to do this in the `state` critical section. Therefore, you should firstly take read the `state` and clone the `Arc` of the LSM state snapshot. Then, you should drop the lock. After that, you can go through all L0 SSTs and create iterators for each of them, then create a merge iterator to retrieve the data. ```rust,no_run fn scan(&self) { let snapshot = { let guard = self.state.read(); Arc::clone(&guard) - } + }; // create iterators and seek them } ``` @@ -73,7 +73,7 @@ In this task, you will need to modify: src/lsm_storage.rs ``` -For get requests, it will be processed as lookups in the memtables, and then scans on the SSTs. You can create a merge iterator over all SSTs after probing all memtables. You can seek to the key that the user wants to lookup. There are two possibilities of the seek: the key is the same as what the user probes, and the key is not the same / does not exist. You should only return the value to the user when the key exists and is the same as probed. You should also reduce the critical region of the state lock as in the previous section. Also remember to handle deleted keys. +For get requests, it will be processed as lookups in the memtables, and then scans on the SSTs. You can create a merge iterator over all SSTs after probing all memtables. You can seek to the key that the user wants to lookup. There are two possibilities of the seek: the key is the same as what the user probes, and the key is not the same / does not exist. You should only return the value to the user when the key exists and is the same as probed. You should also reduce the critical section of the state lock as in the previous section. Also remember to handle deleted keys. ## Test Your Understanding diff --git a/mini-lsm-book/src/week1-06-write-path.md b/mini-lsm-book/src/week1-06-write-path.md index 2818d96a..7e22ca23 100644 --- a/mini-lsm-book/src/week1-06-write-path.md +++ b/mini-lsm-book/src/week1-06-write-path.md @@ -17,13 +17,108 @@ cargo x scheck ## Task 1: Flush Memtable to SST -## Task 2: Update the LSM State +At this point, we have all in-memory things and on-disk files ready, and the storage engine is able to read and merge the data from all these structures. Now, we are going to implement the logic to move things from memory to the disk (so-called flush), and complete the Mini-LSM week 1 tutorial. + +In this task, you will need to modify: + +``` +src/lsm_storage.rs +src/mem_table.rs +``` + +You will need to modify `LSMStorageInner::force_flush_next_imm_memtable` and `MemTable::flush`. In `LSMStorageInner::open`, you will need to create the LSM database directory if it does not exist. To flush a memtable to the disk, we will need to do two things: + +* Select a memtable to flush. +* Create an SST file corresponding to a memtable. +* Remove the memtable from the immutable memtable list and add the SST file to L0 SSTs. + +We have not explained what is L0 (level-0) SSTs for now. In general, they are the set of SSTs files directly created as a result of memtable flush. In week 1 of this tutorial, we will only have L0 SSTs on the disk. We will dive into how to organize them efficiently using leveled or tiered structure on the disk in week 2. + +Note that creating an SST file is a compute-heavy and a costly operation. Again, we do not want to hold the `state` read/write lock for a long time, as it might block other operations and create huge latency spikes in the LSM operations. Also, we use the `state_lock` mutex to serialize state modification operations in the LSM tree. In this task, you will need to think carefully how to use these locks to make the LSM state modification race-condition free while minimizing critical sections. + +We do not have concurrent test cases and you will need to think carefully about your implementation. Also, remember that the last memtable in the immutable memtable list is the earliest one, and is the one that you should flush. + +
+ +Spoilers: Flush L0 Pseudo Code + +```rust,no_run +fn flush_l0(&self) { + let _state_lock = self.state_lock.lock(); + + let memtable_to_flush; + let snapshot = { + let guard = self.state.read(); + memtable_to_flush = guard.imm_memtables.last(); + }; + + let sst = memtable_to_flush.flush()?; + + { + let guard = self.state.write(); + guard.imm_memtables.pop(); + guard.l0_sstables.insert(0, sst); + }; + +} +``` + +
+ +## Task 2: Flush Trigger + +In this task, you will need to modify: + +``` +src/lsm_storage.rs +src/compact.rs +``` + +When the number of memtables (immutable + mutable) in memory exceeds the `num_memtable_limit` in LSM storage options, you should flush the earliest memtable to the disk. This is done by a flush thread in the background. The flush thread will be started with the `MiniLSM` structure. We have already implemented necessary code to start the thread and properly stop the thread. + +In this task, you will need to implement `LsmStorageInner::trigger_flush` in `compact.rs`, and `MiniLsm::close` in `lsm_storage.rs`. `trigger_flush` will be executed every 50 milliseconds. If the number of memtables exceed the limit, you should call `force_flush_next_imm_memtable` to flush a memtable. When the user calls the `close` function, you should wait until the flush thread (and the compaction thread in week 2) to finish. ## Task 3: Filter the SSTs +Now that you have a fully working storage engine, and you can use the mini-lsm-cli to interact with your storage engine. + +```shell +cargo run --bin mini-lsm-cli -- --compaction none +``` + +And then, + +``` +fill 1000 3000 +get 2333 +flush +fill 1000 3000 +get 2333 +flush +get 2333 +scan 2000 2333 +``` + +If you fill more data, you can see your flush thread working and automatically flushing the L0 SSTs without using the `flush` command. + +And lastly, let us implement a simple optimization on filtering the SSTs before we end this week. Based on the key range that the user provides, we can easily filter out some SSTs that do not contain the key range, so that we do not need to read them in the merge iterator. + +In this task, you will need to modify: + +``` +src/lsm_storage.rs +src/iterators/* +src/lsm_iterator.rs +``` + +You will need to change your read path functions to skip the SSTs that is impossible to contain the key/key range. You will need to implement `num_active_iterators` for your iterators so that the test cases can do the check on whether your implementation is correct or not. For `MergeIterator` and `TwoMergeIterator`, it is the sum of `num_active_iterators` of all children iterators. Note that if you did not modify the fields in the starter code of `MergeIterator`, remember to also take `MergeIterator::current` into account. For `LsmIterator` and `FusedIterator`, simply return the number of active iterators from the inner iterator. + +You can implement helper functions like `range_overlap` and `key_within` to simplify your code. + ## Test Your Understanding * What happens if a user requests to delete a key twice? +* How much memory (or number of blocks) will be loaded into memory at the same time when the iterator is initialized? We do not provide reference answers to the questions, and feel free to discuss about them in the Discord community. diff --git a/mini-lsm/src/bin/mini_lsm_cli.rs b/mini-lsm-starter/src/bin/mini-lsm-cli.rs similarity index 93% rename from mini-lsm/src/bin/mini_lsm_cli.rs rename to mini-lsm-starter/src/bin/mini-lsm-cli.rs index 6cfd8dd7..58c45652 100644 --- a/mini-lsm/src/bin/mini_lsm_cli.rs +++ b/mini-lsm-starter/src/bin/mini-lsm-cli.rs @@ -1,20 +1,23 @@ -use std::path::PathBuf; +mod wrapper; +use wrapper::mini_lsm_wrapper; use anyhow::Result; use bytes::Bytes; use clap::{Parser, ValueEnum}; -use mini_lsm::compact::{ +use mini_lsm_wrapper::compact::{ CompactionOptions, LeveledCompactionOptions, SimpleLeveledCompactionOptions, TieredCompactionOptions, }; -use mini_lsm::iterators::StorageIterator; -use mini_lsm::lsm_storage::{LsmStorageOptions, MiniLsm}; +use mini_lsm_wrapper::iterators::StorageIterator; +use mini_lsm_wrapper::lsm_storage::{LsmStorageOptions, MiniLsm}; +use std::path::PathBuf; #[derive(Debug, Clone, ValueEnum)] enum CompactionStrategy { Simple, Leveled, Tiered, + None, } #[derive(Parser, Debug)] @@ -37,6 +40,7 @@ fn main() -> Result<()> { target_sst_size: 2 << 20, // 2MB num_memtable_limit: 3, compaction_options: match args.compaction { + CompactionStrategy::None => CompactionOptions::NoCompaction, CompactionStrategy::Simple => { CompactionOptions::Simple(SimpleLeveledCompactionOptions { size_ratio_percent: 200, diff --git a/mini-lsm-starter/src/bin/wrapper.rs b/mini-lsm-starter/src/bin/wrapper.rs new file mode 100644 index 00000000..81e47f3b --- /dev/null +++ b/mini-lsm-starter/src/bin/wrapper.rs @@ -0,0 +1,6 @@ +pub mod mini_lsm_wrapper { + pub use mini_lsm_starter::*; +} + +#[allow(dead_code)] +fn main() {} diff --git a/mini-lsm-starter/src/compact.rs b/mini-lsm-starter/src/compact.rs index 8d48a1b4..2c2451f8 100644 --- a/mini-lsm-starter/src/compact.rs +++ b/mini-lsm-starter/src/compact.rs @@ -140,4 +140,27 @@ impl LsmStorageInner { } Ok(None) } + + fn trigger_flush(&self) -> Result<()> { + Ok(()) + } + + pub(crate) fn spawn_flush_thread( + self: &Arc, + rx: crossbeam_channel::Receiver<()>, + ) -> Result>> { + let this = self.clone(); + let handle = std::thread::spawn(move || { + let ticker = crossbeam_channel::tick(Duration::from_millis(50)); + loop { + crossbeam_channel::select! { + recv(ticker) -> _ => if let Err(e) = this.trigger_flush() { + eprintln!("flush failed: {}", e); + }, + recv(rx) -> _ => return + } + } + }); + return Ok(Some(handle)); + } } diff --git a/mini-lsm-starter/src/iterators.rs b/mini-lsm-starter/src/iterators.rs index c52aef6c..4079e2a1 100644 --- a/mini-lsm-starter/src/iterators.rs +++ b/mini-lsm-starter/src/iterators.rs @@ -13,4 +13,9 @@ pub trait StorageIterator { /// Move to the next position. fn next(&mut self) -> anyhow::Result<()>; + + /// Number of underlying active iterators for this iterator. + fn num_active_iterators(&self) -> usize { + 1 + } } diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index 3b77d943..ce890a10 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -79,6 +79,16 @@ impl LsmStorageOptions { num_memtable_limit: 50, } } + + pub fn default_for_week1_day6_test() -> Self { + Self { + block_size: 4096, + target_sst_size: 2 << 20, + compaction_options: CompactionOptions::NoCompaction, + enable_wal: false, + num_memtable_limit: 2, + } + } } /// The storage interface of the LSM tree. @@ -96,6 +106,10 @@ pub(crate) struct LsmStorageInner { /// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM. pub struct MiniLsm { pub(crate) inner: Arc, + /// Notifies the L0 flush thread to stop working. (In week 1 day 6) + flush_notifier: crossbeam_channel::Sender<()>, + /// The handle for the compaction thread. (In week 1 day 6) + flush_thread: Mutex>>, /// Notifies the compaction thread to stop working. (In week 2) compaction_notifier: crossbeam_channel::Sender<()>, /// The handle for the compaction thread. (In week 2) @@ -105,6 +119,7 @@ pub struct MiniLsm { impl Drop for MiniLsm { fn drop(&mut self) { self.compaction_notifier.send(()).ok(); + self.flush_notifier.send(()).ok(); } } @@ -117,11 +132,15 @@ impl MiniLsm { /// not exist. pub fn open(path: impl AsRef, options: LsmStorageOptions) -> Result> { let inner = Arc::new(LsmStorageInner::open(path, options)?); - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx1, rx) = crossbeam_channel::unbounded(); let compaction_thread = inner.spawn_compaction_thread(rx)?; + let (tx2, rx) = crossbeam_channel::unbounded(); + let flush_thread = inner.spawn_flush_thread(rx)?; Ok(Arc::new(Self { inner, - compaction_notifier: tx, + flush_notifier: tx2, + flush_thread: Mutex::new(flush_thread), + compaction_notifier: tx1, compaction_thread: Mutex::new(compaction_thread), })) } diff --git a/mini-lsm-starter/src/mem_table.rs b/mini-lsm-starter/src/mem_table.rs index 4df3a407..12b86190 100644 --- a/mini-lsm-starter/src/mem_table.rs +++ b/mini-lsm-starter/src/mem_table.rs @@ -75,7 +75,7 @@ impl MemTable { unimplemented!() } - /// Flush the mem-table to SSTable. + /// Flush the mem-table to SSTable. Implement in week 1 day 6. pub fn flush(&self, _builder: &mut SsTableBuilder) -> Result<()> { unimplemented!() } diff --git a/mini-lsm-starter/src/storage.rs b/mini-lsm-starter/src/storage.rs deleted file mode 100644 index 8a0cef5d..00000000 --- a/mini-lsm-starter/src/storage.rs +++ /dev/null @@ -1 +0,0 @@ -pub struct Storage {} diff --git a/mini-lsm/Cargo.toml b/mini-lsm/Cargo.toml index 89f57f9d..9788ed15 100644 --- a/mini-lsm/Cargo.toml +++ b/mini-lsm/Cargo.toml @@ -26,7 +26,3 @@ serde = { version = "1.0", features = ["derive"] } [dev-dependencies] tempfile = "3" - -[[bin]] -name = "mini-lsm-cli" -path = "src/bin/mini_lsm_cli.rs" diff --git a/mini-lsm/src/bin/mini-lsm-cli.rs b/mini-lsm/src/bin/mini-lsm-cli.rs new file mode 120000 index 00000000..ade5bc52 --- /dev/null +++ b/mini-lsm/src/bin/mini-lsm-cli.rs @@ -0,0 +1 @@ +../../../mini-lsm-starter/src/bin/mini-lsm-cli.rs \ No newline at end of file diff --git a/mini-lsm/src/bin/wrapper.rs b/mini-lsm/src/bin/wrapper.rs new file mode 100644 index 00000000..9b8a4ecd --- /dev/null +++ b/mini-lsm/src/bin/wrapper.rs @@ -0,0 +1,6 @@ +pub mod mini_lsm_wrapper { + pub use mini_lsm::*; +} + +#[allow(dead_code)] +fn main() {} diff --git a/mini-lsm/src/tests/week1_day5.rs b/mini-lsm/src/tests/week1_day5.rs index 9c59e690..9f91f76a 100644 --- a/mini-lsm/src/tests/week1_day5.rs +++ b/mini-lsm/src/tests/week1_day5.rs @@ -1,10 +1,9 @@ use std::ops::Bound; +use self::harness::generate_sst; +use self::harness::{check_iter_result, MockIterator}; use bytes::Bytes; use tempfile::tempdir; -use week1_day5::harness::generate_sst; - -use self::harness::{check_iter_result, MockIterator}; use super::*; use crate::{ diff --git a/mini-lsm/src/tests/week1_day6.rs b/mini-lsm/src/tests/week1_day6.rs new file mode 100644 index 00000000..d196daff --- /dev/null +++ b/mini-lsm/src/tests/week1_day6.rs @@ -0,0 +1,201 @@ +use std::{ops::Bound, time::Duration}; + +use bytes::Bytes; +use tempfile::tempdir; + +use self::harness::check_iter_result; + +use super::*; +use crate::{ + iterators::StorageIterator, + lsm_storage::{LsmStorageInner, LsmStorageOptions, MiniLsm}, +}; + +fn sync(storage: &LsmStorageInner) { + storage + .force_freeze_memtable(&storage.state_lock.lock()) + .unwrap(); + storage.force_flush_next_imm_memtable().unwrap(); +} + +#[test] +fn test_task1_storage_scan() { + let dir = tempdir().unwrap(); + let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); + storage.put(b"0", b"2333333").unwrap(); + storage.put(b"00", b"2333333").unwrap(); + storage.put(b"4", b"23").unwrap(); + sync(&storage); + + storage.delete(b"4").unwrap(); + sync(&storage); + + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage + .force_freeze_memtable(&storage.state_lock.lock()) + .unwrap(); + storage.put(b"00", b"2333").unwrap(); + storage + .force_freeze_memtable(&storage.state_lock.lock()) + .unwrap(); + storage.put(b"3", b"23333").unwrap(); + storage.delete(b"1").unwrap(); + + { + let state = storage.state.read(); + assert_eq!(state.l0_sstables.len(), 2); + assert_eq!(state.imm_memtables.len(), 2); + } + + check_iter_result( + &mut storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(), + vec![ + (Bytes::from("0"), Bytes::from("2333333")), + (Bytes::from("00"), Bytes::from("2333")), + (Bytes::from("2"), Bytes::from("2333")), + (Bytes::from("3"), Bytes::from("23333")), + ], + ); + check_iter_result( + &mut storage + .scan(Bound::Included(b"1"), Bound::Included(b"2")) + .unwrap(), + vec![(Bytes::from("2"), Bytes::from("2333"))], + ); + check_iter_result( + &mut storage + .scan(Bound::Excluded(b"1"), Bound::Excluded(b"3")) + .unwrap(), + vec![(Bytes::from("2"), Bytes::from("2333"))], + ); +} + +#[test] +fn test_task1_storage_get() { + let dir = tempdir().unwrap(); + let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); + storage.put(b"0", b"2333333").unwrap(); + storage.put(b"00", b"2333333").unwrap(); + storage.put(b"4", b"23").unwrap(); + sync(&storage); + + storage.delete(b"4").unwrap(); + sync(&storage); + + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage + .force_freeze_memtable(&storage.state_lock.lock()) + .unwrap(); + storage.put(b"00", b"2333").unwrap(); + storage + .force_freeze_memtable(&storage.state_lock.lock()) + .unwrap(); + storage.put(b"3", b"23333").unwrap(); + storage.delete(b"1").unwrap(); + + { + let state = storage.state.read(); + assert_eq!(state.l0_sstables.len(), 2); + assert_eq!(state.imm_memtables.len(), 2); + } + + assert_eq!( + storage.get(b"0").unwrap(), + Some(Bytes::from_static(b"2333333")) + ); + assert_eq!( + storage.get(b"00").unwrap(), + Some(Bytes::from_static(b"2333")) + ); + assert_eq!( + storage.get(b"2").unwrap(), + Some(Bytes::from_static(b"2333")) + ); + assert_eq!( + storage.get(b"3").unwrap(), + Some(Bytes::from_static(b"23333")) + ); + assert_eq!(storage.get(b"4").unwrap(), None); + assert_eq!(storage.get(b"--").unwrap(), None); + assert_eq!(storage.get(b"555").unwrap(), None); +} + +#[test] +fn test_task2_auto_flush() { + let dir = tempdir().unwrap(); + let storage = MiniLsm::open(&dir, LsmStorageOptions::default_for_week1_day6_test()).unwrap(); + + let value = "1".repeat(1024); // 1KB + + // approximately 6MB + for i in 0..6000 { + storage + .put(format!("{i}").as_bytes(), value.as_bytes()) + .unwrap(); + } + + std::thread::sleep(Duration::from_millis(500)); + + assert!(!storage.inner.state.read().l0_sstables.is_empty()); +} + +#[test] +fn test_task3_sst_filter() { + let dir = tempdir().unwrap(); + let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); + + for i in 1..=10000 { + if i % 1000 == 0 { + sync(&storage); + } + storage + .put(format!("{:05}", i).as_bytes(), b"2333333") + .unwrap(); + } + + let iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); + assert!( + iter.num_active_iterators() >= 10, + "did you implement num_active_iterators? current active iterators = {}", + iter.num_active_iterators() + ); + let max_num = iter.num_active_iterators(); + let iter = storage + .scan( + Bound::Excluded(format!("{:05}", 10000).as_bytes()), + Bound::Unbounded, + ) + .unwrap(); + assert!(iter.num_active_iterators() < max_num); + let min_num = iter.num_active_iterators(); + let iter = storage + .scan( + Bound::Unbounded, + Bound::Excluded(format!("{:05}", 1).as_bytes()), + ) + .unwrap(); + assert_eq!(iter.num_active_iterators(), min_num); + let iter = storage + .scan( + Bound::Unbounded, + Bound::Included(format!("{:05}", 0).as_bytes()), + ) + .unwrap(); + assert_eq!(iter.num_active_iterators(), min_num); + let iter = storage + .scan( + Bound::Included(format!("{:05}", 10001).as_bytes()), + Bound::Unbounded, + ) + .unwrap(); + assert_eq!(iter.num_active_iterators(), min_num); + let iter = storage + .scan( + Bound::Included(format!("{:05}", 5000).as_bytes()), + Bound::Excluded(format!("{:05}", 6000).as_bytes()), + ) + .unwrap(); + assert!(min_num < iter.num_active_iterators() && iter.num_active_iterators() < max_num); +}