Skip to content

Commit

Permalink
add include_temp_doc_store flag in InnerSegmentMeta
Browse files Browse the repository at this point in the history
unset flag on deserialization and after finalize of a segment
set flag when creating new instances
  • Loading branch information
PSeitz committed May 14, 2021
1 parent 84da0be commit de0ea84
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 20 deletions.
41 changes: 36 additions & 5 deletions src/core/index_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::schema::Schema;
use crate::Opstamp;
use census::{Inventory, TrackedObject};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fmt;
use std::path::PathBuf;
use std::{collections::HashSet, sync::atomic::AtomicBool};
use std::{fmt, sync::Arc};

#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeleteMeta {
Expand All @@ -33,6 +33,7 @@ impl SegmentMetaInventory {
let inner = InnerSegmentMeta {
segment_id,
max_doc,
include_temp_doc_store: Arc::new(AtomicBool::new(true)),
deletes: None,
};
SegmentMeta::from(self.inventory.track(inner))
Expand Down Expand Up @@ -80,6 +81,15 @@ impl SegmentMeta {
self.tracked.segment_id
}

/// Removes the Component::TempStore from the alive list and
/// therefore marks the temp docstore file to be deleted by
/// the garbage collection.
pub fn untrack_temp_docstore(&self) {
self.tracked
.include_temp_doc_store
.store(false, std::sync::atomic::Ordering::Relaxed);
}

/// Returns the number of deleted documents.
pub fn num_deleted_docs(&self) -> u32 {
self.tracked
Expand All @@ -96,9 +106,20 @@ impl SegmentMeta {
/// is by removing all files that have been created by tantivy
/// and are not used by any segment anymore.
pub fn list_files(&self) -> HashSet<PathBuf> {
SegmentComponent::iterator()
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
if self
.tracked
.include_temp_doc_store
.load(std::sync::atomic::Ordering::Relaxed)
{
SegmentComponent::iterator()
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
} else {
SegmentComponent::iterator()
.filter(|comp| *comp != &SegmentComponent::TempStore)
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
}
}

/// Returns the relative path of a component of our segment.
Expand Down Expand Up @@ -160,6 +181,7 @@ impl SegmentMeta {
segment_id: inner_meta.segment_id,
max_doc,
deletes: None,
include_temp_doc_store: Arc::new(AtomicBool::new(true)),
});
SegmentMeta { tracked }
}
Expand All @@ -173,6 +195,7 @@ impl SegmentMeta {
let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta {
segment_id: inner_meta.segment_id,
max_doc: inner_meta.max_doc,
include_temp_doc_store: Arc::new(AtomicBool::new(true)),
deletes: Some(delete_meta),
});
SegmentMeta { tracked }
Expand All @@ -184,6 +207,14 @@ struct InnerSegmentMeta {
segment_id: SegmentId,
max_doc: u32,
deletes: Option<DeleteMeta>,
/// If you want to avoid the SegmentComponent::TempStore file to be covered by
/// garbage collection and deleted, set this to true. This is used during merge.
#[serde(skip)]
#[serde(default = "default_temp_store")]
pub(crate) include_temp_doc_store: Arc<AtomicBool>,
}
fn default_temp_store() -> Arc<AtomicBool> {
Arc::new(AtomicBool::new(false))
}

impl InnerSegmentMeta {
Expand Down
2 changes: 1 addition & 1 deletion src/core/segment_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::slice;
/// Each component is stored in its own file,
/// using the pattern `segment_uuid`.`component_extension`,
/// except the delete component that takes an `segment_uuid`.`delete_opstamp`.`component_extension`
#[derive(Copy, Clone)]
#[derive(Copy, Clone, Eq, PartialEq)]
pub enum SegmentComponent {
/// Postings (or inverted list). Sorted lists of document ids, associated to terms
Postings,
Expand Down
9 changes: 4 additions & 5 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,10 @@ fn index_documents(
last_docstamp,
)?;

let segment_entry = SegmentEntry::new(
segment_with_max_doc.meta().clone(),
delete_cursor,
delete_bitset_opt,
);
let meta = segment_with_max_doc.meta().clone();
meta.untrack_temp_docstore();
// update segment_updater inventory to remove tempstore
let segment_entry = SegmentEntry::new(meta, delete_cursor, delete_bitset_opt);
block_on(segment_updater.schedule_add_segment(segment_entry))?;
Ok(true)
}
Expand Down
19 changes: 10 additions & 9 deletions src/indexer/segment_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ pub(crate) struct InnerSegmentUpdater {
//
// This should be up to date as all update happen through
// the unique active `SegmentUpdater`.
active_metas: RwLock<Arc<IndexMeta>>,
active_index_meta: RwLock<Arc<IndexMeta>>,
pool: ThreadPool,
merge_thread_pool: ThreadPool,

Expand Down Expand Up @@ -286,7 +286,7 @@ impl SegmentUpdater {
})?;
let index_meta = index.load_metas()?;
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
active_metas: RwLock::new(Arc::new(index_meta)),
active_index_meta: RwLock::new(Arc::new(index_meta)),
pool,
merge_thread_pool,
index,
Expand Down Expand Up @@ -443,15 +443,15 @@ impl SegmentUpdater {
}

fn store_meta(&self, index_meta: &IndexMeta) {
*self.active_metas.write().unwrap() = Arc::new(index_meta.clone());
*self.active_index_meta.write().unwrap() = Arc::new(index_meta.clone());
}

fn load_metas(&self) -> Arc<IndexMeta> {
self.active_metas.read().unwrap().clone()
fn load_meta(&self) -> Arc<IndexMeta> {
self.active_index_meta.read().unwrap().clone()
}

pub(crate) fn make_merge_operation(&self, segment_ids: &[SegmentId]) -> MergeOperation {
let commit_opstamp = self.load_metas().opstamp;
let commit_opstamp = self.load_meta().opstamp;
MergeOperation::new(&self.merge_operations, commit_opstamp, segment_ids.to_vec())
}

Expand Down Expand Up @@ -546,7 +546,7 @@ impl SegmentUpdater {
})
.collect();

let commit_opstamp = self.load_metas().opstamp;
let commit_opstamp = self.load_meta().opstamp;
let committed_merge_candidates = merge_policy
.compute_merge_candidates(&committed_segments)
.into_iter()
Expand Down Expand Up @@ -577,7 +577,7 @@ impl SegmentUpdater {
{
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater.load_metas().opstamp;
let committed_opstamp = segment_updater.load_meta().opstamp;
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
Expand All @@ -601,7 +601,7 @@ impl SegmentUpdater {
}
}
}
let previous_metas = segment_updater.load_metas();
let previous_metas = segment_updater.load_meta();
let segments_status = segment_updater
.segment_manager
.end_merge(merge_operation.segment_ids(), after_merge_segment_entry)?;
Expand All @@ -613,6 +613,7 @@ impl SegmentUpdater {

segment_updater.consider_merge_options().await;
} // we drop all possible handle to a now useless `SegmentMeta`.

let _ = garbage_collect_files(segment_updater).await;
Ok(())
});
Expand Down

0 comments on commit de0ea84

Please sign in to comment.