diff --git a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs index befb340379d9..e86ffee9e9b6 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs @@ -354,7 +354,8 @@ impl BlockBasedLogWithSummary { } } - // Works only if there are no pending entries + /// Works only if there are no pending entries. + /// Use flush() to retrieve the phys when there are pending entries. pub fn get_phys(&self) -> BlockBasedLogWithSummaryPhys { assert!(self.this.pending_entries.is_empty()); assert!(self.chunk_summary.pending_entries.is_empty()); diff --git a/cmd/zfs_object_agent/zettacache/src/index.rs b/cmd/zfs_object_agent/zettacache/src/index.rs index 43aed5ca728c..2db47dce57d5 100644 --- a/cmd/zfs_object_agent/zettacache/src/index.rs +++ b/cmd/zfs_object_agent/zettacache/src/index.rs @@ -3,6 +3,7 @@ use crate::block_access::*; use crate::block_based_log::*; use crate::extent_allocator::ExtentAllocator; use crate::zettacache::AtimeHistogramPhys; +use more_asserts::*; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -40,8 +41,9 @@ pub struct IndexEntry { impl OnDisk for IndexEntry {} impl BlockBasedLogEntry for IndexEntry {} -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct ZettaCacheIndexPhys { + last_key: Option, atime_histogram: AtimeHistogramPhys, log: BlockBasedLogWithSummaryPhys, } @@ -49,6 +51,7 @@ pub struct ZettaCacheIndexPhys { impl ZettaCacheIndexPhys { pub fn new(min_atime: Atime) -> Self { Self { + last_key: None, atime_histogram: AtimeHistogramPhys::new(min_atime), log: Default::default(), } @@ -56,10 +59,19 @@ impl ZettaCacheIndexPhys { } pub struct ZettaCacheIndex { + pub last_key: Option, pub atime_histogram: AtimeHistogramPhys, pub log: BlockBasedLogWithSummary, } +impl std::fmt::Debug for ZettaCacheIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ZettaCacheIndex") + .field("last_key", &self.last_key) + .finish() + } +} + impl ZettaCacheIndex { pub async fn open( block_access: Arc, @@ -67,6 +79,7 @@ impl ZettaCacheIndex { phys: ZettaCacheIndexPhys, ) -> Self { Self { + last_key: phys.last_key, atime_histogram: phys.atime_histogram, log: BlockBasedLogWithSummary::open(block_access, extent_allocator, phys.log).await, } @@ -74,28 +87,41 @@ impl ZettaCacheIndex { pub async fn flush(&mut self) -> ZettaCacheIndexPhys { ZettaCacheIndexPhys { + last_key: self.last_key, atime_histogram: self.atime_histogram.clone(), log: self.log.flush().await, } } + /// Retrieve the index phys. This only works if there are no pending log entries. + /// Use flush() to retrieve the phys when there are pending entries. pub fn get_phys(&self) -> ZettaCacheIndexPhys { ZettaCacheIndexPhys { + last_key: self.last_key, atime_histogram: self.atime_histogram.clone(), log: self.log.get_phys(), } } - pub fn get_histogram_start(&self) -> Atime { - self.atime_histogram.get_start() + pub fn first_atime(&self) -> Atime { + self.atime_histogram.first() + } + + pub fn update_last_key(&mut self, key: IndexKey) { + if let Some(last_key) = self.last_key { + assert_gt!(key, last_key); + } + self.last_key = Some(key); } pub fn append(&mut self, entry: IndexEntry) { + self.update_last_key(entry.key); self.atime_histogram.insert(entry.value); self.log.append(entry); } pub fn clear(&mut self) { + self.last_key = None; self.atime_histogram.clear(); self.log.clear(); } diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache.rs b/cmd/zfs_object_agent/zettacache/src/zettacache.rs index e6d935e6f300..c3e65b9cbaf1 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache.rs @@ -27,12 +27,14 @@ use serde::{Deserialize, Serialize}; use std::collections::btree_map; use std::collections::BTreeMap; use std::convert::TryFrom; +use std::ops::Bound::{Excluded, Unbounded}; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use std::time::Instant; use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; +use tokio::time::{sleep_until, timeout_at}; lazy_static! { static ref SUPERBLOCK_SIZE: u64 = get_tunable("superblock_size", 4 * 1024); @@ -40,6 +42,8 @@ lazy_static! { pub static ref DEFAULT_SLAB_SIZE: u32 = get_tunable("default_slab_size", 16 * 1024 * 1024); static ref DEFAULT_METADATA_SIZE_PCT: f64 = get_tunable("default_metadata_size_pct", 15.0); // Can lower this to test forced eviction. static ref MAX_PENDING_CHANGES: usize = get_tunable("max_pending_changes", 50_000); // XXX should be based on RAM usage, ~tens of millions at least + static ref CHECKPOINT_INTERVAL: Duration = Duration::from_secs(get_tunable("checkpoint_interval_secs", 60)); + static ref MERGE_PROGRESS_MESSAGE_INTERVAL: Duration = Duration::from_millis(get_tunable("merge_progress_message_interval_ms", 1000)); static ref TARGET_CACHE_SIZE_PCT: u64 = get_tunable("target_cache_size_pct", 80); static ref HIGH_WATER_CACHE_SIZE_PCT: u64 = get_tunable("high_water_cache_size_pct", 82); @@ -86,11 +90,10 @@ struct ZettaCheckpointPhys { generation: CheckpointId, extent_allocator: ExtentAllocatorPhys, block_allocator: BlockAllocatorPhys, - //last_valid_data_offset: u64, // XXX move to BlockAllocatorPhys last_atime: Atime, index: ZettaCacheIndexPhys, operation_log: BlockBasedLogPhys, - merging_operation_log: Option>, + merge_progress: Option<(BlockBasedLogPhys, ZettaCacheIndexPhys)>, } impl ZettaCheckpointPhys { @@ -147,55 +150,55 @@ impl BlockBasedLogEntry for OperationLogEntry {} #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct AtimeHistogramPhys { histogram: Vec, - start: Atime, + first: Atime, } impl AtimeHistogramPhys { - pub fn new(start: Atime) -> AtimeHistogramPhys { + pub fn new(first: Atime) -> AtimeHistogramPhys { AtimeHistogramPhys { histogram: Default::default(), - start, + first, } } - pub fn get_start(&self) -> Atime { - self.start + pub fn first(&self) -> Atime { + self.first } /// Reset the start to a later atime, discarding older entries. /// Requests to reset to an earlier atime are ignored. - pub fn reset_start(&mut self, new_start: Atime) { - if new_start <= self.start { + pub fn reset_first(&mut self, new_first: Atime) { + if new_first <= self.first { return; } - let delta = new_start - self.start; + let delta = new_first - self.first; // XXX - if this becomes a bottleneck we should change the histogram to a VecDeque // so that we don't have to copy when deleting the head of the histogram self.histogram.drain(0..delta); - self.start = new_start; + self.first = new_first; } pub fn atime_for_target_size(&self, target_size: u64) -> Atime { info!( "histogram starts at {:?} and has {} entries", - self.start, + self.first, self.histogram.len() ); let mut remaining = target_size; for (index, &bytes) in self.histogram.iter().enumerate().rev() { if remaining <= bytes { trace!("final include of {} for target at bucket {}", bytes, index); - return Atime(self.start.0 + index as u64); + return Atime(self.first.0 + index as u64); } trace!("including {} in target at bucket {}", bytes, index); remaining -= bytes; } - self.start + self.first } pub fn insert(&mut self, value: IndexValue) { - assert_ge!(value.atime, self.start); - let index = value.atime - self.start; + assert_ge!(value.atime, self.first); + let index = value.atime - self.first; if index >= self.histogram.len() { self.histogram.resize(index + 1, 0); } @@ -203,8 +206,8 @@ impl AtimeHistogramPhys { } pub fn remove(&mut self, value: IndexValue) { - assert_ge!(value.atime, self.start); - let index = value.atime - self.start; + assert_ge!(value.atime, self.first); + let index = value.atime - self.first; self.histogram[index] -= u64::from(value.size); } @@ -217,32 +220,71 @@ impl AtimeHistogramPhys { } } +#[derive(Debug, Serialize, Deserialize)] +struct MergeProgress { + new_index: ZettaCacheIndexPhys, + free_list: Vec, +} + +// #[derive(Serialize, Deserialize)] +#[derive(Debug)] +enum MergeMessage { + Progress(MergeProgress), + Complete(ZettaCacheIndex), +} + +impl MergeMessage { + /// Compose a progress update to send to the checkpoint task. + async fn new_progress(next_index: &mut ZettaCacheIndex, free_list: Vec) -> Self { + let timer = Instant::now(); + let free_count = free_list.len(); + let message = MergeProgress { + new_index: next_index.flush().await, + free_list, + }; + trace!("sending progress: index with {} entries ({}MB) last is {:?} flushed in {}ms, and {} frees", + next_index.log.len(), + next_index.log.num_bytes() / 1024 / 1024, + next_index.last_key, timer.elapsed().as_millis(), + free_count,); + Self::Progress(message) + } +} + struct MergeState { old_pending_changes: BTreeMap, old_operation_log_phys: BlockBasedLogPhys, + eviction_cutoff: Atime, } impl MergeState { - async fn merge_pending_state( + /// Add an entry to the new index, or evict it if it's atime is before the eviction cut off. + fn add_to_index_or_evict( &self, - old_index: &ZettaCacheIndex, - new_index: &mut ZettaCacheIndex, - ) -> Vec { - // Helper function to add an entry being merged to the new index or, - // if due to eviction, place it on the free list. - fn add_to_index_or_list( - index: &mut ZettaCacheIndex, - list: &mut Vec, - entry: IndexEntry, - ) { - if entry.value.atime >= index.get_histogram_start() { - index.append(entry); - } else { - list.push(entry); - } + entry: IndexEntry, + index: &mut ZettaCacheIndex, + free_list: &mut Vec, + ) { + trace!("add-or-evict {:?}", entry); + if entry.value.atime >= self.eviction_cutoff { + index.append(entry); + } else { + free_list.push(entry.value); + index.update_last_key(entry.key); } + } + /// This function runs in an async task to merge a set of pending changes with the current on-disk + /// index in order to produce a new up-to-date on-disk index. It sends periodic "progress updates" + /// (including block frees) to the checkpoint task. + async fn merge_task( + &self, + tx: tokio::sync::mpsc::Sender, + old_index: Arc>, + mut next_index: ZettaCacheIndex, + ) { let begin = Instant::now(); + let old_index = old_index.read().await; info!( "writing new index to merge {} pending changes into index of {} entries ({} MB)", self.old_pending_changes.len(), @@ -250,123 +292,152 @@ impl MergeState { old_index.log.num_bytes() / 1024 / 1024, ); - // XXX load operation_log and verify that the pending_changes match it? - let mut free_list: Vec = Vec::new(); - let mut pending_changes_iter = self.old_pending_changes.iter().peekable(); - old_index - .log - .iter() - .for_each(|entry| { - // First, process any pending changes which are before this - // index entry, which must be all Inserts (Removes, - // RemoveThenInserts, and AtimeUpdates refer to existing Index - // entries). - trace!("next index entry: {:?}", entry); - while let Some((&pc_key, &PendingChange::Insert(pc_value))) = - pending_changes_iter.peek() - { - if pc_key >= entry.key { - break; - } - // Add this new entry to the index - add_to_index_or_list( - new_index, - &mut free_list, - IndexEntry { - key: pc_key, - value: pc_value, - }, - ); - pending_changes_iter.next(); + let mut free_list: Vec = Vec::new(); + let mut timer = Instant::now(); + + let start_key = next_index.last_key; + info!("using {:?} as start key for merge", start_key); + let mut pending_changes_iter = self + .old_pending_changes + .range((start_key.map_or(Unbounded, Excluded), Unbounded)) + .peekable(); + + let mut index_stream = Box::pin(old_index.log.iter()); + let mut index_skips = 0; + while let Some(entry) = index_stream.next().await { + if timer.elapsed() >= *MERGE_PROGRESS_MESSAGE_INTERVAL { + // send free_list and current index phys to checkpointer + tx.send(MergeMessage::new_progress(&mut next_index, free_list).await) + .await + .unwrap_or_else(|e| panic!("couldn't send: {}", e)); + free_list = Vec::new(); + timer = Instant::now(); + } + // If the next index is already "started", advance the old index to the start point + // XXX - would be nice to simply *start* from the start_key, rather than iterate up to it + if let Some(start_key) = start_key { + if entry.key <= start_key { + trace!("skipping index entry: {:?}", entry.key); + index_skips += 1; + continue; } + } + // First, process any pending changes which are before this + // index entry, which must be all Inserts (Removes, + // RemoveThenInserts, and AtimeUpdates refer to existing Index + // entries). + trace!("next index entry: {:?}", entry); + while let Some((&pc_key, &PendingChange::Insert(pc_value))) = + pending_changes_iter.peek() + { + if pc_key >= entry.key { + break; + } + // Add this new entry to the index + self.add_to_index_or_evict( + IndexEntry { + key: pc_key, + value: pc_value, + }, + &mut next_index, + &mut free_list, + ); + pending_changes_iter.next(); + } - let next_pc_opt = pending_changes_iter.peek(); - match next_pc_opt { - Some((&pc_key, &PendingChange::Remove())) => { - if pc_key == entry.key { - // Don't write this entry to the new generation. - // this pending change is consumed - pending_changes_iter.next(); - } else { - // There shouldn't be a pending removal of an entry that doesn't exist in the index. - assert_gt!(pc_key, entry.key); - add_to_index_or_list(new_index, &mut free_list, entry); - } - } - Some((&pc_key, &PendingChange::Insert(_pc_value))) => { - // Insertions are processed above. There can't be an - // index entry with the same key. If there were, it has - // to be removed first, resulting in a - // PendingChange::RemoveThenInsert. + let next_pc_opt = pending_changes_iter.peek(); + match next_pc_opt { + Some((&pc_key, &PendingChange::Remove())) => { + if pc_key == entry.key { + // Don't write this entry to the new generation. + // this pending change is consumed + pending_changes_iter.next(); + } else { + // There shouldn't be a pending removal of an entry that doesn't exist in the index. assert_gt!(pc_key, entry.key); - add_to_index_or_list(new_index, &mut free_list, entry); + self.add_to_index_or_evict(entry, &mut next_index, &mut free_list); } - Some((&pc_key, &PendingChange::RemoveThenInsert(pc_value))) => { - if pc_key == entry.key { - // This key must have been removed (evicted) and then re-inserted. - // Add the pending change to the next generation instead of the current index's entry - assert_eq!(pc_value.size, entry.value.size); - add_to_index_or_list( - new_index, - &mut free_list, - IndexEntry { - key: pc_key, - value: pc_value, - }, - ); - - // this pending change is consumed - pending_changes_iter.next(); - } else { - // We shouldn't have skipped any, because there has to be a corresponding Index entry - assert_gt!(pc_key, entry.key); - add_to_index_or_list(new_index, &mut free_list, entry); - } - } - Some((&pc_key, &PendingChange::UpdateAtime(pc_value))) => { - if pc_key == entry.key { - // Add the pending entry to the next generation instead of the current index's entry - assert_eq!(pc_value.location, entry.value.location); - assert_eq!(pc_value.size, entry.value.size); - add_to_index_or_list( - new_index, - &mut free_list, - IndexEntry { - key: pc_key, - value: pc_value, - }, - ); - - // this pending change is consumed - pending_changes_iter.next(); - } else { - // We shouldn't have skipped any, because there has to be a corresponding Index entry - assert_gt!(pc_key, entry.key); - add_to_index_or_list(new_index, &mut free_list, entry); - } + } + Some((&pc_key, &PendingChange::Insert(_pc_value))) => { + // Insertions are processed above. There can't be an + // index entry with the same key. If there were, it has + // to be removed first, resulting in a + // PendingChange::RemoveThenInsert. + assert_gt!(pc_key, entry.key); + self.add_to_index_or_evict(entry, &mut next_index, &mut free_list); + } + Some((&pc_key, &PendingChange::RemoveThenInsert(pc_value))) => { + if pc_key == entry.key { + // This key must have been removed (evicted) and then re-inserted. + // Add the pending change to the next generation instead of the current index's entry + assert_eq!(pc_value.size, entry.value.size); + self.add_to_index_or_evict( + IndexEntry { + key: pc_key, + value: pc_value, + }, + &mut next_index, + &mut free_list, + ); + + // this pending change is consumed + pending_changes_iter.next(); + } else { + // We shouldn't have skipped any, because there has to be a corresponding Index entry + assert_gt!(pc_key, entry.key); + self.add_to_index_or_evict(entry, &mut next_index, &mut free_list); } - None => { - // no more pending changes - add_to_index_or_list(new_index, &mut free_list, entry); + } + Some((&pc_key, &PendingChange::UpdateAtime(pc_value))) => { + if pc_key == entry.key { + // Add the pending entry to the next generation instead of the current index's entry + assert_eq!(pc_value.location, entry.value.location); + assert_eq!(pc_value.size, entry.value.size); + self.add_to_index_or_evict( + IndexEntry { + key: pc_key, + value: pc_value, + }, + &mut next_index, + &mut free_list, + ); + + // this pending change is consumed + pending_changes_iter.next(); + } else { + // We shouldn't have skipped any, because there has to be a corresponding Index entry + assert_gt!(pc_key, entry.key); + self.add_to_index_or_evict(entry, &mut next_index, &mut free_list); } } - future::ready(()) - }) - .await; + None => { + // no more pending changes + self.add_to_index_or_evict(entry, &mut next_index, &mut free_list); + } + } + } while let Some((&pc_key, &PendingChange::Insert(pc_value))) = pending_changes_iter.peek() { + if timer.elapsed() >= *MERGE_PROGRESS_MESSAGE_INTERVAL { + // send free_list and current index phys to checkpointer + tx.send(MergeMessage::new_progress(&mut next_index, free_list).await) + .await + .unwrap_or_else(|e| panic!("couldn't send: {}", e)); + free_list = Vec::new(); + timer = Instant::now(); + } // Add this new entry to the index trace!( "remaining pending change, appending to new index: {:?} {:?}", pc_key, pc_value ); - add_to_index_or_list( - new_index, - &mut free_list, + self.add_to_index_or_evict( IndexEntry { key: pc_key, value: pc_value, }, + &mut next_index, + &mut free_list, ); // Consume pending change. We don't do that in the `while let` // because we want to leave any unmatched items in the iterator so @@ -379,18 +450,25 @@ impl MergeState { "next={:?}", pending_changes_iter.peek().unwrap() ); + info!("skipped {} index entries", index_skips); - new_index.flush().await; + drop(old_index); + next_index.flush().await; - debug!("new histogram: {:#?}", new_index.atime_histogram); + debug!("new histogram: {:#?}", next_index.atime_histogram); info!( - "wrote new index with {} entries ({} MB) in {:.1}s ({:.1}MB/s)", - new_index.log.len(), - new_index.log.num_bytes() / 1024 / 1024, + "wrote next index with {} entries ({} MB) in {:.1}s ({:.1}MB/s)", + next_index.log.len(), + next_index.log.num_bytes() / 1024 / 1024, begin.elapsed().as_secs_f64(), - (new_index.log.num_bytes() as f64 / 1024f64 / 1024f64) / begin.elapsed().as_secs_f64(), + (next_index.log.num_bytes() as f64 / 1024f64 / 1024f64) / begin.elapsed().as_secs_f64(), ); - free_list + + // send the now complete next_index as the final message + tx.send(MergeMessage::Complete(next_index)) + .await + .unwrap_or_else(|e| panic!("couldn't send: {}", e)); + trace!("sent final checkpoint message"); } } @@ -399,7 +477,7 @@ struct ZettaCacheState { super_phys: ZettaSuperBlockPhys, block_allocator: BlockAllocator, pending_changes: BTreeMap, - // keep state associated with any on-going merge here + // Keep state associated with any on-going merge here merging_state: Option>, // XXX Given that we have to lock the entire State to do anything, we might // get away with this being a Rc? And the ExtentAllocator doesn't really @@ -446,9 +524,9 @@ impl ZettaCache { }, index: Default::default(), operation_log: Default::default(), - merging_operation_log: None, last_atime: Atime(0), block_allocator: BlockAllocatorPhys::new(data_start, block_access.size() - data_start), + merge_progress: None, }; let raw = block_access.chunk_to_raw(EncodeType::Json, &checkpoint); assert_le!(raw.len(), *DEFAULT_CHECKPOINT_RING_BUFFER_SIZE as usize); @@ -521,33 +599,10 @@ impl ZettaCache { let pending_changes = Self::load_operation_log(&operation_log, &mut atime_histogram).await; debug!("atime_histogram: {:#?}", atime_histogram); - // If we had a merge in progress, reconstruct the merging state and update the atime histogram - let merging_state = match checkpoint.merging_operation_log { - Some(old_operation_log_phys) => { - let merge_operation_log = BlockBasedLog::open( - block_access.clone(), - extent_allocator.clone(), - old_operation_log_phys.clone(), - ); - let old_pending_changes = - Self::load_operation_log(&merge_operation_log, &mut atime_histogram).await; - Some(Arc::new(MergeState { - old_pending_changes, - old_operation_log_phys, - })) - } - None => None, - }; - // let merging_state = - // checkpoint - // .merging_operation_log - // .map(async move |old_operation_log_phys| { - // }); - let state = ZettaCacheState { block_access: block_access.clone(), pending_changes, - merging_state, + merging_state: None, atime_histogram, operation_log, super_phys: phys, @@ -572,19 +627,27 @@ impl ZettaCache { outstanding_inserts: Arc::new(Semaphore::new(*CACHE_INSERT_MAX_BUFFER)), }; - let merge_cache = this.clone(); - tokio::spawn(async move { - merge_cache.continuous_merge().await; - }); + let (merge_rx, merge_index) = match checkpoint.merge_progress { + Some((old_operation_log_phys, next_index)) => ( + Some( + this.state + .lock() + .await + .resume_merge_task( + this.index.clone(), + next_index.clone(), + old_operation_log_phys, + ) + .await, + ), + Some(next_index), + ), + None => (None, None), + }; let my_cache = this.clone(); tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(60)); - loop { - interval.tick().await; - let index = my_cache.index.read().await; - my_cache.state.lock().await.flush_checkpoint(&index).await; - } + my_cache.checkpoint_task(merge_rx, merge_index).await; }); let my_cache = this.clone(); @@ -694,6 +757,93 @@ impl ZettaCache { pending_changes } + /// The checkpoint task is primarily responsible for writing out a persistent checkpoint every 60s. + /// It is also responsible for kicking off a merge task every time we accumulate enough pending change. + /// While a merge task is running, this task listens for and processes eviction requests from the merge task. + /// The active merge task state is also captured in each checkpoint so that it may be resumed from the + /// checkpoint if necessary. On resume the merge task is restarted during cache open and a channel to + /// task and the index phys for the current progress are passed in. + async fn checkpoint_task( + &self, + mut merge_rx: Option>, + mut next_index: Option, + ) { + let next_tick = tokio::time::Instant::now(); + loop { + let next_tick = std::cmp::max( + tokio::time::Instant::now(), + next_tick + *CHECKPOINT_INTERVAL, + ); + // if there is no current merging state, check to see if a merge should be started + if self.state.lock().await.merging_state.is_none() { + assert!(merge_rx.is_none()); + assert!(next_index.is_none()); + merge_rx = self + .state + .lock() + .await + .try_start_merge_task(self.index.clone()) + .await; + } + if let Some(rx) = &mut merge_rx { + let mut msg_count = 0; + let mut free_count = 0; + // we have a channel to an active merge task, check it for messages + loop { + let result = timeout_at(next_tick, rx.recv()).await; + match result { + // capture merge progress: the current next index phys and eviction requests + Ok(Some(MergeMessage::Progress(merge_checkpoint))) => { + msg_count += 1; + free_count += merge_checkpoint.free_list.len(); + trace!( + "merge checkpoint with {} free requests", + merge_checkpoint.free_list.len() + ); + next_index = Some(merge_checkpoint.new_index); + // free the extent ranges associated with the evicted blocks + // XXX - should check to see if the extent is still in the "coverage" area. + // it seems possible that the meta-data area could grow during the merge cycle. + for value in merge_checkpoint.free_list { + trace!("eviction requested for {:?}", value); + self.state.lock().await.block_allocator.free(value.extent()); + } + } + // merge task complete, replace the current index with the new index + Ok(Some(MergeMessage::Complete(new_index))) => { + let mut index = self.index.write().await; + self.state + .lock() + .await + .rotate_index(&mut index, new_index) + .await; + next_index = None; + merge_rx = None; + break; + } + Ok(None) => panic!("channel closed before Complete message received"), + Err(_) => break, // timed out + } + } + debug!( + "processed {} merge checkpoints with {} evictions requested", + msg_count, free_count, + ); + } + + // flush out a new checkpoint every CHECKPOINT_INTERVAL to capture the current state + sleep_until(next_tick).await; + { + let index = self.index.read().await; + self.state + .lock() + .await + .flush_checkpoint(&index, next_index.clone()) + .await; + } + } + } + #[measure(HitCount)] fn cache_miss_without_index_read(&self, key: &IndexKey) { trace!("cache miss without reading index for {:?}", key); @@ -724,16 +874,15 @@ impl ZettaCache { #[measure(Throughput)] #[measure(HitCount)] pub async fn lookup(&self, guid: PoolGuid, block: BlockId) -> LookupResponse { - // We want to hold the index lock over the whole operation so that the - // on-disk index can't change after we get the value from it. Lock - // ordering requres that we lock the index before locking the state. + // Hold the index lock over the whole operation + // so that the index can't change after we get the value from it. + // Lock ordering requres that we lock the index before locking the state. let key = IndexKey { guid, block }; let locked_key = LockedKey(self.outstanding_lookups.lock(key).await); let index = self.index.read().await; let read_data_fut_opt = { - // We don't want to hold the state lock while reading from disk. We - // use lock_state_non_async() to ensure that we can't hold it across - // .await. + // We don't want to hold the state lock while reading from disk so we + // use lock_non_send() to ensure that we can't hold it across .await. let mut state = self.state.lock_non_send().await; match state.pending_changes.get(&key).copied() { @@ -750,20 +899,30 @@ impl ZettaCache { } None => { // No pending change in current state; need to look in merging state - state.merging_state.clone().and_then(|ms| { + if let Some(ms) = &state.merging_state { + let eviction_cutoff = ms.eviction_cutoff; ms.old_pending_changes .get(&key) .copied() .map(|pc| match pc { PendingChange::Insert(value) | PendingChange::RemoveThenInsert(value) - | PendingChange::UpdateAtime(value) => state.lookup(key, value), + | PendingChange::UpdateAtime(value) => { + // if this block's atime is before the eviction cutoff, return none + if value.atime >= eviction_cutoff { + state.lookup(key, value) + } else { + data_reader_none() + } + } PendingChange::Remove() => { // Pending change says this has been removed data_reader_none() } }) - }) + } else { + None + } } } }; @@ -783,7 +942,6 @@ impl ZettaCache { } trace!("lookup has no pending_change; checking index for {:?}", key); - // read value from on-disk index match index.log.lookup_by_key(&key, |entry| entry.key).await { None => { // key not in index @@ -791,16 +949,23 @@ impl ZettaCache { LookupResponse::Absent(locked_key) } Some(entry) => { - // read data from location indicated by index + // Again, we don't want to hold the state lock while reading from disk so + // we use lock_non_send() to ensure that we can't hold it across .await. + let read_data_fut = { + let mut state = self.state.lock_non_send().await; + + if let Some(ms) = &state.merging_state { + if entry.value.atime < ms.eviction_cutoff { + // Block is being evicted, abort the read attempt + self.cache_miss_after_index_read(&key); + return LookupResponse::Absent(locked_key); + } + } - // We don't want to hold the state lock while reading from disk. - // We use lock_state_non_async() to ensure that we can't hold it - // across .await. - let read_data_fut = self - .state - .lock_non_send() - .await - .lookup_with_value_from_index(key, Some(entry.value)); + state.lookup_with_value_from_index(key, Some(entry.value)) + }; + + // read data from location indicated by index match read_data_fut.await { Some((vec, value)) => { self.cache_hit_after_index_read(&key); @@ -882,41 +1047,6 @@ impl ZettaCache { } } } - - async fn continuous_merge(&self) { - let mut interval = tokio::time::interval(Duration::from_secs(60)); - loop { - interval.tick().await; - trace!("starting continuous merge cycle"); - - // Start by getting the current pending state - let merging = self.state.lock().await.get_merge_state().await; - if let Some((merging_state, mut new_index)) = merging { - // Now merge the pending state into the index with only the read lock. - // This constructs a new index and returns a list of blocks freed due to eviction. - let free_list = { - let index = self.index.read().await; - merging_state - .merge_pending_state(&index, &mut new_index) - .await - }; - - // Finally, sync the new index and pending state with write lock and state mutex - // Note: this is safe because we are holding the index write lock while clearing - // and re-setting the index (so no checkpoint can happen between). - let mut index = self.index.write().await; - index.clear(); - self.state - .lock() - .await - .sync_merge_state(free_list, &mut index, new_index) - .await; - } else { - trace!("nothing to merge in this cycle"); - continue; - } - } - } } type DataReader = Pin, IndexValue)>> + Send>>; @@ -959,7 +1089,7 @@ impl ZettaCacheState { // The metadata overwrote this data, so it's no longer in the cache. // Remove from index and return None. trace!( - "{:?} at {:?} was overwritten by metadata allocator; removing from cache", + "cache miss: {:?} at {:?} was overwritten by metadata allocator; removing from cache", key, value ); @@ -969,6 +1099,18 @@ impl ZettaCacheState { self.remove_from_index(key, value); return data_reader_none(); } + // If value.atime is before eviction cutoff, return a cache miss + if let Some(ms) = &self.merging_state { + if value.atime < ms.eviction_cutoff { + trace!( + "cache miss: {:?} at {:?} was prior to eviction cutoff {:?}", + key, + value, + ms.eviction_cutoff + ); + return data_reader_none(); + } + } trace!("cache hit: reading {:?} from {:?}", key, value); if value.atime != self.atime { self.atime_histogram.remove(value); @@ -1165,7 +1307,13 @@ impl ZettaCacheState { .map(|extent| extent.location) } - async fn flush_checkpoint(&mut self, index: &ZettaCacheIndex) { + /// Flush out the current set of pending index changes. This is a recovery point in case of + /// a system crash between index rewrites. + async fn flush_checkpoint( + &mut self, + index: &ZettaCacheIndex, + next_index: Option, + ) { debug!( "flushing checkpoint {:?}", self.super_phys.last_checkpoint_id.next() @@ -1181,7 +1329,7 @@ impl ZettaCacheState { // waiting down in the ExtentAllocator. If we get that working, we'll // still need to clean up the outstanding_reads entries that have // completed, at some point. Even as-is, letting them accumulate for a - // whole checkpoint might not be great. We might want a "cleaner" to + // whole checkpoint might not be great. It might be "cleaner" to // run every second and remove completed entries. Or have the read task // lock the outstanding_reads and remove itself (which might perform // worse due to contention on the global lock). @@ -1203,17 +1351,26 @@ impl ZettaCacheState { self.pending_changes.len() ); + let operation_log_phys = self.operation_log.flush().await; + + // Note that it is possible to have a merge in progress with no next_index available. + // This can happen if we have not yet received any progress messages from the merge task. + // In this case we just store an empty "in progress" index in the checkpoint. + let merge_progress = self.merging_state.as_ref().map(|ms| { + ( + ms.old_operation_log_phys.clone(), + next_index.unwrap_or_default(), + ) + }); + let checkpoint = ZettaCheckpointPhys { generation: self.super_phys.last_checkpoint_id.next(), extent_allocator: self.extent_allocator.get_phys(), index: index.get_phys(), - operation_log: self.operation_log.flush().await, - merging_operation_log: self - .merging_state - .as_ref() - .map(|ms| ms.old_operation_log_phys.clone()), + operation_log: operation_log_phys, last_atime: self.atime, block_allocator: self.block_allocator.flush().await, + merge_progress, }; let mut checkpoint_location = self.super_phys.last_checkpoint_extent.location @@ -1265,20 +1422,64 @@ impl ZettaCacheState { ); } - async fn get_merge_state(&mut self) -> Option<(Arc, ZettaCacheIndex)> { - // Check to see if there is enough state available to justify a merge - // Since we handle eviction as part of the merge process, also trigger a merge - // if the cache has passed a "high water" percent full level - if self.merging_state.is_none() - && self.pending_changes.len() < *MAX_PENDING_CHANGES + /// Restart a merge task from the saved checkpoint state + async fn resume_merge_task( + &mut self, + old_index: Arc>, + new_index_phys: ZettaCacheIndexPhys, + old_operation_log_phys: BlockBasedLogPhys, + ) -> tokio::sync::mpsc::Receiver { + let old_operation_log = BlockBasedLog::open( + self.block_access.clone(), + self.extent_allocator.clone(), + old_operation_log_phys.clone(), + ); + let old_pending_changes = + ZettaCache::load_operation_log(&old_operation_log, &mut self.atime_histogram).await; + let next_index = ZettaCacheIndex::open( + self.block_access.clone(), + self.extent_allocator.clone(), + new_index_phys, + ) + .await; + info!( + "restarting merge at {:?} with eviction atime {:?}", + next_index.last_key, + next_index.first_atime(), + ); + + let merging_state = Arc::new(MergeState { + eviction_cutoff: next_index.first_atime(), + old_pending_changes, + old_operation_log_phys, + }); + self.merging_state = Some(merging_state.clone()); + + // The checkpoint task will be constantly reading from the channel, so we don't really need + // much of a buffer here. We use 100 because we might accumulate some messages while actually + // flushing out the checkpoint. + let (tx, rx) = tokio::sync::mpsc::channel(100); + tokio::spawn(async move { merging_state.merge_task(tx, old_index, next_index).await }); + + rx + } + + /// Start a new merge task if there are enough pending changes + async fn try_start_merge_task( + &mut self, + old_index: Arc>, + ) -> Option> { + if self.pending_changes.len() < *MAX_PENDING_CHANGES && self.atime_histogram.sum() < (self.block_access.size() / 100) * *HIGH_WATER_CACHE_SIZE_PCT { + trace!( + "not starting new merge, only {} pending changes", + self.pending_changes.len() + ); return None; } - // An eviction atime is used to maintain a target cache size. - // The new index will be created with an atime calculated from the target cache size. let target_size = (self.block_access.size() / 100) * *TARGET_CACHE_SIZE_PCT; info!( "target cache size for storage size {}GB is {}GB; {}MB used; {}MB high-water; {}MB freeing; histogram covers {}MB", @@ -1289,80 +1490,66 @@ impl ZettaCacheState { self.block_allocator.get_freeing() / 1024 / 1024, self.atime_histogram.sum() / 1024 / 1024, ); - let new_index = ZettaCacheIndex::open( + let eviction_atime = self.atime_histogram.atime_for_target_size(target_size); + + let old_operation_log_phys = self.operation_log.flush().await; + + // Create an empty operation log that is consistent with the empty pending state. + // Note that we don't want to just clear the existing operation log, since we are + // still preserving that in the merging state. + self.operation_log = BlockBasedLog::open( + self.block_access.clone(), + self.extent_allocator.clone(), + Default::default(), + ); + let next_index = ZettaCacheIndex::open( self.block_access.clone(), self.extent_allocator.clone(), - ZettaCacheIndexPhys::new(self.atime_histogram.atime_for_target_size(target_size)), + ZettaCacheIndexPhys::new(eviction_atime), ) .await; - info!( - "set new eviction atime to {:?}", - new_index.get_histogram_start() - ); - - // If we already have a merging state, this must have come from a checkpoint - // state with an active merge. Go ahead and use this recovered state rather - // than pulling the current pending state. - if let Some(merging_state) = self.merging_state.as_ref().cloned() { - return Some((merging_state, new_index)); - } // Set up state with current pending changes and operation log in the new merging state. // Note that we are "taking" the current set of pending changes for the merge // and leaving an empty log behind to accumulate new changes. - let new_state = Arc::new(MergeState { + let merging_state = Arc::new(MergeState { + eviction_cutoff: eviction_atime, old_pending_changes: std::mem::take(&mut self.pending_changes), - old_operation_log_phys: self.operation_log.flush().await, + old_operation_log_phys, }); - self.merging_state = Some(new_state.clone()); + self.merging_state = Some(merging_state.clone()); - // Create an empty operation log that is consistent with the empty pending state. - // Note that we don't want to just clear the existing operation log, since we are - // still preserving that physical state in the merging state. - self.operation_log = BlockBasedLog::open( - self.block_access.clone(), - self.extent_allocator.clone(), - Default::default(), - ); + // The checkpoint task will be constantly reading from the channel, so we don't really need + // much of a buffer here. We use 100 because we might accumulate some messages while actually + // flushing out the checkpoint. + let (tx, rx) = tokio::sync::mpsc::channel(100); + tokio::spawn(async move { merging_state.merge_task(tx, old_index, next_index).await }); - Some((new_state, new_index)) + Some(rx) } - async fn sync_merge_state( - &mut self, - free_list: Vec, - index: &mut ZettaCacheIndex, - new_index: ZettaCacheIndex, - ) { - // Free the evicted blocks from the cache. - // Also remove any reference from the current set of pending changes. - debug!("freeing {} blocks", free_list.len()); - for entry in free_list { - let pc = self.pending_changes.remove(&entry.key); - assert!(matches!(pc, Some(PendingChange::UpdateAtime(_)) | None)); - self.block_allocator.free(entry.value.extent()); - } + /// Switch to the new index returned from the merge task and clear the merging state. + /// Called with the old index write-locked. + async fn rotate_index(&mut self, index: &mut ZettaCacheIndex, next_index: ZettaCacheIndex) { + let merging_state = self.merging_state.as_ref().unwrap(); - // Clear the operation log state associated with the merged changes - self.merging_state - .take() - .unwrap() + // Free up the extents that have been allocated for the merge pending state + merging_state .old_operation_log_phys .clone() .clear(self.extent_allocator.clone()); // Move the "start" of the zettacache state histogram to reflect the new index + self.atime_histogram.reset_first(next_index.first_atime()); trace!( "reset incore histogram start to {:?}", - new_index.get_histogram_start() + self.atime_histogram.first() ); - self.atime_histogram - .reset_start(new_index.get_histogram_start()); - // Finish the merge by switching the zettacache to the new index - // which will become persistent with the next checkpoint - *index = new_index; + // Free up the space used by the old index and rotate in the new index + index.clear(); + *index = next_index; - assert!(self.merging_state.is_none()); + self.merging_state = None; } }