Skip to content

Commit

Permalink
add include_temp_doc_store flag
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed May 14, 2021
1 parent 84da0be commit afb7eb4
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 18 deletions.
32 changes: 26 additions & 6 deletions src/core/index_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ impl SegmentMetaInventory {
#[derive(Clone)]
pub struct SegmentMeta {
tracked: TrackedObject<InnerSegmentMeta>,
/// If you want to avoid the SegmentComponent::TempStore file to be covered by
/// garbage collection and deleted, set this to true. This is used during merge.
pub(crate) include_temp_doc_store: bool,
}

impl fmt::Debug for SegmentMeta {
Expand All @@ -68,7 +71,10 @@ impl serde::Serialize for SegmentMeta {

impl From<TrackedObject<InnerSegmentMeta>> for SegmentMeta {
fn from(tracked: TrackedObject<InnerSegmentMeta>) -> SegmentMeta {
SegmentMeta { tracked }
SegmentMeta {
tracked,
include_temp_doc_store: true,
}
}
}

Expand Down Expand Up @@ -96,9 +102,16 @@ impl SegmentMeta {
/// is by removing all files that have been created by tantivy
/// and are not used by any segment anymore.
pub fn list_files(&self) -> HashSet<PathBuf> {
SegmentComponent::iterator()
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
if self.include_temp_doc_store {
SegmentComponent::iterator()
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
} else {
SegmentComponent::iterator()
.filter(|comp| *comp != &SegmentComponent::TempStore)
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
}
}

/// Returns the relative path of a component of our segment.
Expand Down Expand Up @@ -161,7 +174,10 @@ impl SegmentMeta {
max_doc,
deletes: None,
});
SegmentMeta { tracked }
SegmentMeta {
tracked,
include_temp_doc_store: true,
}
}

#[doc(hidden)]
Expand All @@ -175,7 +191,10 @@ impl SegmentMeta {
max_doc: inner_meta.max_doc,
deletes: Some(delete_meta),
});
SegmentMeta { tracked }
SegmentMeta {
tracked,
include_temp_doc_store: true,
}
}
}

Expand All @@ -190,6 +209,7 @@ impl InnerSegmentMeta {
pub fn track(self, inventory: &SegmentMetaInventory) -> SegmentMeta {
SegmentMeta {
tracked: inventory.inventory.track(self),
include_temp_doc_store: true,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/segment_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::slice;
/// Each component is stored in its own file,
/// using the pattern `segment_uuid`.`component_extension`,
/// except the delete component that takes an `segment_uuid`.`delete_opstamp`.`component_extension`
#[derive(Copy, Clone)]
#[derive(Copy, Clone, Eq, PartialEq)]
pub enum SegmentComponent {
/// Postings (or inverted list). Sorted lists of document ids, associated to terms
Postings,
Expand Down
5 changes: 4 additions & 1 deletion src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,11 @@ fn index_documents(
last_docstamp,
)?;

let mut meta = segment_with_max_doc.meta().clone();
meta.include_temp_doc_store = false;
// update segment_updater inventory to remove tempstore
let segment_entry = SegmentEntry::new(
segment_with_max_doc.meta().clone(),
meta,
delete_cursor,
delete_bitset_opt,
);
Expand Down
21 changes: 11 additions & 10 deletions src/indexer/segment_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub fn merge_segments<Dir: Directory>(
.any(|index| index.settings() != &target_settings)
{
return Err(crate::TantivyError::InvalidArgument(
"Attempt to merge indices with different index_settings".to_string(),
"Attempt to merge indices with different index_settings".to_string(),
));
}

Expand Down Expand Up @@ -246,7 +246,7 @@ pub(crate) struct InnerSegmentUpdater {
//
// This should be up to date as all update happen through
// the unique active `SegmentUpdater`.
active_metas: RwLock<Arc<IndexMeta>>,
active_index_meta: RwLock<Arc<IndexMeta>>,
pool: ThreadPool,
merge_thread_pool: ThreadPool,

Expand Down Expand Up @@ -286,7 +286,7 @@ impl SegmentUpdater {
})?;
let index_meta = index.load_metas()?;
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
active_metas: RwLock::new(Arc::new(index_meta)),
active_index_meta: RwLock::new(Arc::new(index_meta)),
pool,
merge_thread_pool,
index,
Expand Down Expand Up @@ -443,15 +443,15 @@ impl SegmentUpdater {
}

fn store_meta(&self, index_meta: &IndexMeta) {
*self.active_metas.write().unwrap() = Arc::new(index_meta.clone());
*self.active_index_meta.write().unwrap() = Arc::new(index_meta.clone());
}

fn load_metas(&self) -> Arc<IndexMeta> {
self.active_metas.read().unwrap().clone()
fn load_meta(&self) -> Arc<IndexMeta> {
self.active_index_meta.read().unwrap().clone()
}

pub(crate) fn make_merge_operation(&self, segment_ids: &[SegmentId]) -> MergeOperation {
let commit_opstamp = self.load_metas().opstamp;
let commit_opstamp = self.load_meta().opstamp;
MergeOperation::new(&self.merge_operations, commit_opstamp, segment_ids.to_vec())
}

Expand Down Expand Up @@ -546,7 +546,7 @@ impl SegmentUpdater {
})
.collect();

let commit_opstamp = self.load_metas().opstamp;
let commit_opstamp = self.load_meta().opstamp;
let committed_merge_candidates = merge_policy
.compute_merge_candidates(&committed_segments)
.into_iter()
Expand Down Expand Up @@ -577,7 +577,7 @@ impl SegmentUpdater {
{
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater.load_metas().opstamp;
let committed_opstamp = segment_updater.load_meta().opstamp;
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
Expand All @@ -601,7 +601,7 @@ impl SegmentUpdater {
}
}
}
let previous_metas = segment_updater.load_metas();
let previous_metas = segment_updater.load_meta();
let segments_status = segment_updater
.segment_manager
.end_merge(merge_operation.segment_ids(), after_merge_segment_entry)?;
Expand All @@ -613,6 +613,7 @@ impl SegmentUpdater {

segment_updater.consider_merge_options().await;
} // we drop all possible handle to a now useless `SegmentMeta`.

let _ = garbage_collect_files(segment_updater).await;
Ok(())
});
Expand Down

0 comments on commit afb7eb4

Please sign in to comment.