From de0ea848e55f2b875e999f7f0c39c4bf6c341144 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 14 May 2021 14:52:32 +0200 Subject: [PATCH] add include_temp_doc_store flag in InnerSegmentMeta unset flag on deserialization and after finalize of a segment set flag when creating new instances --- src/core/index_meta.rs | 41 +++++++++++++++++++++++++++++----- src/core/segment_component.rs | 2 +- src/indexer/index_writer.rs | 9 ++++---- src/indexer/segment_updater.rs | 19 ++++++++-------- 4 files changed, 51 insertions(+), 20 deletions(-) diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 05f5eef74e..0a93975836 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -4,9 +4,9 @@ use crate::schema::Schema; use crate::Opstamp; use census::{Inventory, TrackedObject}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; -use std::fmt; use std::path::PathBuf; +use std::{collections::HashSet, sync::atomic::AtomicBool}; +use std::{fmt, sync::Arc}; #[derive(Clone, Debug, Serialize, Deserialize)] struct DeleteMeta { @@ -33,6 +33,7 @@ impl SegmentMetaInventory { let inner = InnerSegmentMeta { segment_id, max_doc, + include_temp_doc_store: Arc::new(AtomicBool::new(true)), deletes: None, }; SegmentMeta::from(self.inventory.track(inner)) @@ -80,6 +81,15 @@ impl SegmentMeta { self.tracked.segment_id } + /// Removes the Component::TempStore from the alive list and + /// therefore marks the temp docstore file to be deleted by + /// the garbage collection. + pub fn untrack_temp_docstore(&self) { + self.tracked + .include_temp_doc_store + .store(false, std::sync::atomic::Ordering::Relaxed); + } + /// Returns the number of deleted documents. pub fn num_deleted_docs(&self) -> u32 { self.tracked @@ -96,9 +106,20 @@ impl SegmentMeta { /// is by removing all files that have been created by tantivy /// and are not used by any segment anymore. pub fn list_files(&self) -> HashSet { - SegmentComponent::iterator() - .map(|component| self.relative_path(*component)) - .collect::>() + if self + .tracked + .include_temp_doc_store + .load(std::sync::atomic::Ordering::Relaxed) + { + SegmentComponent::iterator() + .map(|component| self.relative_path(*component)) + .collect::>() + } else { + SegmentComponent::iterator() + .filter(|comp| *comp != &SegmentComponent::TempStore) + .map(|component| self.relative_path(*component)) + .collect::>() + } } /// Returns the relative path of a component of our segment. @@ -160,6 +181,7 @@ impl SegmentMeta { segment_id: inner_meta.segment_id, max_doc, deletes: None, + include_temp_doc_store: Arc::new(AtomicBool::new(true)), }); SegmentMeta { tracked } } @@ -173,6 +195,7 @@ impl SegmentMeta { let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta { segment_id: inner_meta.segment_id, max_doc: inner_meta.max_doc, + include_temp_doc_store: Arc::new(AtomicBool::new(true)), deletes: Some(delete_meta), }); SegmentMeta { tracked } @@ -184,6 +207,14 @@ struct InnerSegmentMeta { segment_id: SegmentId, max_doc: u32, deletes: Option, + /// If you want to avoid the SegmentComponent::TempStore file to be covered by + /// garbage collection and deleted, set this to true. This is used during merge. + #[serde(skip)] + #[serde(default = "default_temp_store")] + pub(crate) include_temp_doc_store: Arc, +} +fn default_temp_store() -> Arc { + Arc::new(AtomicBool::new(false)) } impl InnerSegmentMeta { diff --git a/src/core/segment_component.rs b/src/core/segment_component.rs index 7df76c65eb..c5e07255b2 100644 --- a/src/core/segment_component.rs +++ b/src/core/segment_component.rs @@ -4,7 +4,7 @@ use std::slice; /// Each component is stored in its own file, /// using the pattern `segment_uuid`.`component_extension`, /// except the delete component that takes an `segment_uuid`.`delete_opstamp`.`component_extension` -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Eq, PartialEq)] pub enum SegmentComponent { /// Postings (or inverted list). Sorted lists of document ids, associated to terms Postings, diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index f18fa1df9e..dee4fefbc6 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -239,11 +239,10 @@ fn index_documents( last_docstamp, )?; - let segment_entry = SegmentEntry::new( - segment_with_max_doc.meta().clone(), - delete_cursor, - delete_bitset_opt, - ); + let meta = segment_with_max_doc.meta().clone(); + meta.untrack_temp_docstore(); + // update segment_updater inventory to remove tempstore + let segment_entry = SegmentEntry::new(meta, delete_cursor, delete_bitset_opt); block_on(segment_updater.schedule_add_segment(segment_entry))?; Ok(true) } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index eb89f4e1c0..0be2f93de1 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -246,7 +246,7 @@ pub(crate) struct InnerSegmentUpdater { // // This should be up to date as all update happen through // the unique active `SegmentUpdater`. - active_metas: RwLock>, + active_index_meta: RwLock>, pool: ThreadPool, merge_thread_pool: ThreadPool, @@ -286,7 +286,7 @@ impl SegmentUpdater { })?; let index_meta = index.load_metas()?; Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater { - active_metas: RwLock::new(Arc::new(index_meta)), + active_index_meta: RwLock::new(Arc::new(index_meta)), pool, merge_thread_pool, index, @@ -443,15 +443,15 @@ impl SegmentUpdater { } fn store_meta(&self, index_meta: &IndexMeta) { - *self.active_metas.write().unwrap() = Arc::new(index_meta.clone()); + *self.active_index_meta.write().unwrap() = Arc::new(index_meta.clone()); } - fn load_metas(&self) -> Arc { - self.active_metas.read().unwrap().clone() + fn load_meta(&self) -> Arc { + self.active_index_meta.read().unwrap().clone() } pub(crate) fn make_merge_operation(&self, segment_ids: &[SegmentId]) -> MergeOperation { - let commit_opstamp = self.load_metas().opstamp; + let commit_opstamp = self.load_meta().opstamp; MergeOperation::new(&self.merge_operations, commit_opstamp, segment_ids.to_vec()) } @@ -546,7 +546,7 @@ impl SegmentUpdater { }) .collect(); - let commit_opstamp = self.load_metas().opstamp; + let commit_opstamp = self.load_meta().opstamp; let committed_merge_candidates = merge_policy .compute_merge_candidates(&committed_segments) .into_iter() @@ -577,7 +577,7 @@ impl SegmentUpdater { { let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); if let Some(delete_operation) = delete_cursor.get() { - let committed_opstamp = segment_updater.load_metas().opstamp; + let committed_opstamp = segment_updater.load_meta().opstamp; if delete_operation.opstamp < committed_opstamp { let index = &segment_updater.index; let segment = index.segment(after_merge_segment_entry.meta().clone()); @@ -601,7 +601,7 @@ impl SegmentUpdater { } } } - let previous_metas = segment_updater.load_metas(); + let previous_metas = segment_updater.load_meta(); let segments_status = segment_updater .segment_manager .end_merge(merge_operation.segment_ids(), after_merge_segment_entry)?; @@ -613,6 +613,7 @@ impl SegmentUpdater { segment_updater.consider_merge_options().await; } // we drop all possible handle to a now useless `SegmentMeta`. + let _ = garbage_collect_files(segment_updater).await; Ok(()) });