From 86075d6560099d051988bbad36bb8c5f4e1ee87e Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 3 Sep 2021 12:34:26 +0200 Subject: [PATCH 1/9] add merge for DeleteBitSet, allow custom DeleteBitSet on merge --- common/src/bitset.rs | 2 ++ src/fastfield/mod.rs | 1 + src/indexer/merger.rs | 30 ++++++++++++++++++++++++++++++ src/indexer/segment_updater.rs | 21 +++++++++++++++++++++ 4 files changed, 54 insertions(+) diff --git a/common/src/bitset.rs b/common/src/bitset.rs index 2de5ab5e89..5192c502d3 100644 --- a/common/src/bitset.rs +++ b/common/src/bitset.rs @@ -248,6 +248,7 @@ impl BitSet { } /// Returns the number of elements in the `BitSet`. + #[inline] pub fn len(&self) -> usize { self.len as usize } @@ -297,6 +298,7 @@ impl BitSet { .map(|delta_bucket| bucket + delta_bucket as u32) } + #[inline] pub fn max_value(&self) -> u32 { self.max_value } diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index dd100074c9..8cce202950 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -26,6 +26,7 @@ Read access performance is comparable to that of an array lookup. pub use self::alive_bitset::write_alive_bitset; pub use self::alive_bitset::AliveBitSet; pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter}; +pub use self::delete::merge_delete_bitset; pub use self::error::{FastFieldNotAvailableError, Result}; pub use self::facet_reader::FacetReader; pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter}; diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 66bd30de7d..8ebeb7ad74 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,4 +1,5 @@ use crate::error::DataCorruption; +use crate::fastfield::merge_delete_bitset; use crate::fastfield::CompositeFastFieldSerializer; use crate::fastfield::DynamicFastFieldReader; use crate::fastfield::FastFieldDataAccess; @@ -156,6 +157,24 @@ impl IndexMerger { schema: Schema, index_settings: IndexSettings, segments: &[Segment], + ) -> crate::Result { + let delete_bitsets = segments.iter().map(|_| None).collect_vec(); + Self::open_with_custom_delete_set(schema, index_settings, segments, delete_bitsets) + } + + // Create merge with a custom delete set. + // For every Segment, a delete bitset can be provided, which + // will be merged with the existing bit set. Make sure the index + // corresponds to the segment index. + // + // This can be used to merge but also apply an additional filter. + // One use case is demux, which is basically taking a list of + // segments and partitions them e.g. by a value in a field. + pub fn open_with_custom_delete_set( + schema: Schema, + index_settings: IndexSettings, + segments: &[Segment], + delete_bitset_opt: Vec>, ) -> crate::Result { let mut readers = vec![]; let mut max_doc: u32 = 0u32; @@ -166,6 +185,17 @@ impl IndexMerger { readers.push(reader); } } + for (reader, new_delete_bitset_opt) in readers.iter_mut().zip(delete_bitset_opt.into_iter()) + { + if let Some(new_delete_bitset) = new_delete_bitset_opt { + if let Some(existing_bitset) = reader.delete_bitset_opt.as_mut() { + let merged_bitset = merge_delete_bitset(&new_delete_bitset, existing_bitset); + reader.delete_bitset_opt = Some(merged_bitset); + } else { + reader.delete_bitset_opt = Some(new_delete_bitset); + } + } + } if let Some(sort_by_field) = index_settings.sort_by_field.as_ref() { readers = Self::sort_readers_by_min_sort_field(readers, sort_by_field)?; } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index bcd89a5fb5..bde97bb972 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -7,6 +7,7 @@ use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::core::META_FILEPATH; use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult}; +use crate::fastfield::DeleteBitSet; use crate::indexer::delete_queue::DeleteCursor; use crate::indexer::index_writer::advance_deletes; use crate::indexer::merge_operation::MergeOperationInventory; @@ -162,6 +163,26 @@ fn merge( pub fn merge_segments( indices: &[Index], output_directory: Dir, +) -> crate::Result { + merge_filtered_segments(indices, vec![], output_directory) +} + +/// Advanced: Merges a list of segments from different indices in a new index. +/// +/// Returns `TantivyError` if the the indices list is empty or their +/// schemas don't match. +/// +/// `output_directory`: is assumed to be empty. +/// +/// # Warning +/// This function does NOT check or take the `IndexWriter` is running. It is not +/// meant to work if you have an IndexWriter running for the origin indices, or +/// the destination Index. +#[doc(hidden)] +pub fn merge_filtered_segments( + indices: &[Index], + _filter_docids: Vec, + output_directory: Dir, ) -> crate::Result { if indices.is_empty() { // If there are no indices to merge, there is no need to do anything. From b60e5922ecdd116fed62ec3be9518ae1b51c58c9 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 6 Sep 2021 13:00:45 +0200 Subject: [PATCH 2/9] forward delete bitsets on merge, add tests --- src/core/segment_reader.rs | 12 ++ src/indexer/merger.rs | 10 +- src/indexer/mod.rs | 2 +- src/indexer/segment_updater.rs | 268 +++++++++++++++++++++++++++++---- src/lib.rs | 2 +- 5 files changed, 253 insertions(+), 41 deletions(-) diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 0180868ead..4c6123b000 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -5,6 +5,7 @@ use crate::core::SegmentId; use crate::directory::CompositeFile; use crate::directory::FileSlice; use crate::error::DataCorruption; +use crate::fastfield::merge_delete_bitset; use crate::fastfield::AliveBitSet; use crate::fastfield::FacetReader; use crate::fastfield::FastFieldReaders; @@ -69,6 +70,17 @@ impl SegmentReader { &self.schema } + /// Merges the passed bitset with the existing one. + pub fn apply_delete_bitset(&mut self, delete_bitset: DeleteBitSet) { + if let Some(existing_bitset) = self.delete_bitset_opt.as_mut() { + let merged_bitset = merge_delete_bitset(&delete_bitset, existing_bitset); + self.delete_bitset_opt = Some(merged_bitset); + } else { + self.delete_bitset_opt = Some(delete_bitset); + } + self.num_docs = self.max_doc - self.num_deleted_docs(); + } + /// Return the number of documents that have been /// deleted in the segment. pub fn num_deleted_docs(&self) -> DocId { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 8ebeb7ad74..30a9dd7d7a 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,5 +1,4 @@ use crate::error::DataCorruption; -use crate::fastfield::merge_delete_bitset; use crate::fastfield::CompositeFastFieldSerializer; use crate::fastfield::DynamicFastFieldReader; use crate::fastfield::FastFieldDataAccess; @@ -181,20 +180,15 @@ impl IndexMerger { for segment in segments { if segment.meta().num_docs() > 0 { let reader = SegmentReader::open(segment)?; - max_doc += reader.num_docs(); readers.push(reader); } } for (reader, new_delete_bitset_opt) in readers.iter_mut().zip(delete_bitset_opt.into_iter()) { if let Some(new_delete_bitset) = new_delete_bitset_opt { - if let Some(existing_bitset) = reader.delete_bitset_opt.as_mut() { - let merged_bitset = merge_delete_bitset(&new_delete_bitset, existing_bitset); - reader.delete_bitset_opt = Some(merged_bitset); - } else { - reader.delete_bitset_opt = Some(new_delete_bitset); - } + reader.apply_delete_bitset(new_delete_bitset); } + max_doc += reader.num_docs(); } if let Some(sort_by_field) = index_settings.sort_by_field.as_ref() { readers = Self::sort_readers_by_min_sort_field(readers, sort_by_field)?; diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 27d9fee968..fb9770cebd 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -26,7 +26,7 @@ pub use self::prepared_commit::PreparedCommit; pub use self::segment_entry::SegmentEntry; pub use self::segment_manager::SegmentManager; pub use self::segment_serializer::SegmentSerializer; -pub use self::segment_updater::merge_segments; +pub use self::segment_updater::merge_indices; pub use self::segment_writer::SegmentWriter; /// Alias for the default merge policy, which is the `LogMergePolicy`. diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index bde97bb972..b47b6dc9f2 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -160,14 +160,41 @@ fn merge( /// meant to work if you have an IndexWriter running for the origin indices, or /// the destination Index. #[doc(hidden)] -pub fn merge_segments( +pub fn merge_indices( indices: &[Index], output_directory: Dir, ) -> crate::Result { - merge_filtered_segments(indices, vec![], output_directory) + if indices.is_empty() { + // If there are no indices to merge, there is no need to do anything. + return Err(crate::TantivyError::InvalidArgument( + "No indices given to marge".to_string(), + )); + } + + let target_settings = indices[0].settings().clone(); + + // let's check that all of the indices have the same index settings + if indices + .iter() + .skip(1) + .any(|index| index.settings() != &target_settings) + { + return Err(crate::TantivyError::InvalidArgument( + "Attempt to merge indices with different index_settings".to_string(), + )); + } + + let mut segments: Vec = Vec::new(); + for index in indices { + segments.extend(index.searchable_segments()?); + } + + let non_filter = segments.iter().map(|_| None).collect::>(); + merge_filtered_segments(&segments, target_settings, non_filter, output_directory) } /// Advanced: Merges a list of segments from different indices in a new index. +/// Additional you can provide a delete bitset for each segment to ignore docids. /// /// Returns `TantivyError` if the the indices list is empty or their /// schemas don't match. @@ -180,22 +207,22 @@ pub fn merge_segments( /// the destination Index. #[doc(hidden)] pub fn merge_filtered_segments( - indices: &[Index], - _filter_docids: Vec, + segments: &[Segment], + target_settings: IndexSettings, + filter_docids: Vec>, output_directory: Dir, ) -> crate::Result { - if indices.is_empty() { + if segments.is_empty() { // If there are no indices to merge, there is no need to do anything. return Err(crate::TantivyError::InvalidArgument( - "No indices given to marge".to_string(), + "No segments given to marge".to_string(), )); } - let target_schema = indices[0].schema(); - let target_settings = indices[0].settings().clone(); + let target_schema = segments[0].schema(); // let's check that all of the indices have the same schema - if indices + if segments .iter() .skip(1) .any(|index| index.schema() != target_schema) @@ -204,29 +231,19 @@ pub fn merge_filtered_segments( "Attempt to merge different schema indices".to_string(), )); } - // let's check that all of the indices have the same index settings - if indices - .iter() - .skip(1) - .any(|index| index.settings() != &target_settings) - { - return Err(crate::TantivyError::InvalidArgument( - "Attempt to merge indices with different index_settings".to_string(), - )); - } - let mut segments: Vec = Vec::new(); - for index in indices { - segments.extend(index.searchable_segments()?); - } - - let mut merged_index = Index::create(output_directory, target_schema.clone(), target_settings)?; + let mut merged_index = Index::create( + output_directory, + target_schema.clone(), + target_settings.clone(), + )?; let merged_segment = merged_index.new_segment(); let merged_segment_id = merged_segment.id(); - let merger: IndexMerger = IndexMerger::open( + let merger: IndexMerger = IndexMerger::open_with_custom_delete_set( merged_index.schema(), merged_index.settings().clone(), &segments[..], + filter_docids, )?; let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?; let num_docs = merger.write(segment_serializer)?; @@ -246,7 +263,7 @@ pub fn merge_filtered_segments( ); let index_meta = IndexMeta { - index_settings: indices[0].load_metas()?.index_settings, // index_settings of all segments should be the same + index_settings: target_settings, // index_settings of all segments should be the same segments: vec![segment_meta], schema: target_schema, opstamp: 0u64, @@ -667,11 +684,18 @@ impl SegmentUpdater { #[cfg(test)] mod tests { - use super::merge_segments; + use super::merge_indices; + use crate::collector::TopDocs; use crate::directory::RamDirectory; + use crate::fastfield::DeleteBitSet; use crate::indexer::merge_policy::tests::MergeWheneverPossible; + use crate::indexer::merger::IndexMerger; + use crate::indexer::segment_updater::merge_filtered_segments; + use crate::query::QueryParser; use crate::schema::*; + use crate::DocAddress; use crate::Index; + use crate::Segment; #[test] fn test_delete_during_merge() -> crate::Result<()> { @@ -818,7 +842,7 @@ mod tests { assert_eq!(indices.len(), 3); let output_directory = RamDirectory::default(); - let index = merge_segments(&indices, output_directory)?; + let index = merge_indices(&indices, output_directory)?; assert_eq!(index.schema(), schema); let segments = index.searchable_segments()?; @@ -832,7 +856,7 @@ mod tests { #[test] fn test_merge_empty_indices_array() { - let merge_result = merge_segments(&[], RamDirectory::default()); + let merge_result = merge_indices(&[], RamDirectory::default()); assert!(merge_result.is_err()); } @@ -859,9 +883,191 @@ mod tests { }; // mismatched schema index list - let result = merge_segments(&[first_index, second_index], RamDirectory::default()); + let result = merge_indices(&[first_index, second_index], RamDirectory::default()); assert!(result.is_err()); Ok(()) } + + #[test] + fn test_merge_filtered_segments() -> crate::Result<()> { + let first_index = { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests()?; + index_writer.add_document(doc!(text_field=>"some text 1")); + index_writer.add_document(doc!(text_field=>"some text 2")); + index_writer.commit()?; + index + }; + + let second_index = { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests()?; + index_writer.add_document(doc!(text_field=>"some text 3")); + index_writer.add_document(doc!(text_field=>"some text 4")); + index_writer.delete_term(Term::from_field_text(text_field, "4")); + + index_writer.commit()?; + index + }; + + let mut segments: Vec = Vec::new(); + segments.extend(first_index.searchable_segments()?); + segments.extend(second_index.searchable_segments()?); + + let target_settings = first_index.settings().clone(); + + let filter_segment_1 = DeleteBitSet::for_test(&[1], 2); + let filter_segment_2 = DeleteBitSet::for_test(&[0], 2); + + let filter_segments = vec![Some(filter_segment_1), Some(filter_segment_2)]; + + let merged_index = merge_filtered_segments( + &segments, + target_settings, + filter_segments, + RamDirectory::default(), + )?; + + let segments = merged_index.searchable_segments()?; + assert_eq!(segments.len(), 1); + + let segment_metas = segments[0].meta(); + assert_eq!(segment_metas.num_deleted_docs(), 0); + assert_eq!(segment_metas.num_docs(), 1); + + Ok(()) + } + + #[test] + fn test_merge_single_filtered_segments() -> crate::Result<()> { + let first_index = { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests()?; + index_writer.add_document(doc!(text_field=>"test text")); + index_writer.add_document(doc!(text_field=>"some text 2")); + + index_writer.add_document(doc!(text_field=>"some text 3")); + index_writer.add_document(doc!(text_field=>"some text 4")); + + index_writer.delete_term(Term::from_field_text(text_field, "4")); + + index_writer.commit()?; + index + }; + + let mut segments: Vec = Vec::new(); + segments.extend(first_index.searchable_segments()?); + + let target_settings = first_index.settings().clone(); + + let filter_segment = DeleteBitSet::for_test(&[0], 4); + + let filter_segments = vec![Some(filter_segment)]; + + let index = merge_filtered_segments( + &segments, + target_settings, + filter_segments, + RamDirectory::default(), + )?; + + let segments = index.searchable_segments()?; + assert_eq!(segments.len(), 1); + + let segment_metas = segments[0].meta(); + assert_eq!(segment_metas.num_deleted_docs(), 0); + assert_eq!(segment_metas.num_docs(), 2); + + let searcher = index.reader().unwrap().searcher(); + { + let text_field = index.schema().get_field("text").unwrap(); + + let do_search = |term: &str| { + let query = QueryParser::for_index(&index, vec![text_field]) + .parse_query(term) + .unwrap(); + let top_docs: Vec<(f32, DocAddress)> = + searcher.search(&query, &TopDocs::with_limit(3)).unwrap(); + + top_docs.iter().map(|el| el.1.doc_id).collect::>() + }; + + assert_eq!(do_search("test"), vec![] as Vec); + assert_eq!(do_search("text"), vec![0, 1]); + } + + Ok(()) + } + + #[test] + fn test_apply_docid_filter_in_merger() -> crate::Result<()> { + let first_index = { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests()?; + index_writer.add_document(doc!(text_field=>"some text 1")); + index_writer.add_document(doc!(text_field=>"some text 2")); + + index_writer.add_document(doc!(text_field=>"some text 3")); + index_writer.add_document(doc!(text_field=>"some text 4")); + + index_writer.delete_term(Term::from_field_text(text_field, "4")); + + index_writer.commit()?; + index + }; + + let mut segments: Vec = Vec::new(); + segments.extend(first_index.searchable_segments()?); + + let target_settings = first_index.settings().clone(); + { + let filter_segment = DeleteBitSet::for_test(&[1], 4); + let filter_segments = vec![Some(filter_segment)]; + let target_schema = segments[0].schema(); + let merged_index = Index::create( + RamDirectory::default(), + target_schema.clone(), + target_settings.clone(), + )?; + let merger: IndexMerger = IndexMerger::open_with_custom_delete_set( + merged_index.schema(), + merged_index.settings().clone(), + &segments[..], + filter_segments, + )?; + + let docids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect(); + assert_eq!(docids_alive, vec![0, 2]); + } + + { + let filter_segments = vec![None]; + let target_schema = segments[0].schema(); + let merged_index = Index::create( + RamDirectory::default(), + target_schema.clone(), + target_settings.clone(), + )?; + let merger: IndexMerger = IndexMerger::open_with_custom_delete_set( + merged_index.schema(), + merged_index.settings().clone(), + &segments[..], + filter_segments, + )?; + + let docids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect(); + assert_eq!(docids_alive, vec![0, 1, 2]); + } + + Ok(()) + } } diff --git a/src/lib.rs b/src/lib.rs index 314ebb93a3..33477784ea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -169,7 +169,7 @@ pub use crate::core::{ }; pub use crate::core::{InvertedIndexReader, SegmentReader}; pub use crate::directory::Directory; -pub use crate::indexer::merge_segments; +pub use crate::indexer::merge_indices; pub use crate::indexer::operation::UserOperation; pub use crate::indexer::IndexWriter; pub use crate::postings::Postings; From 1ad84f3b330281d2835bdb0ea4554c8d6897e7a7 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 7 Sep 2021 12:30:55 +0200 Subject: [PATCH 3/9] add demux operation and tests --- src/indexer/demuxer.rs | 339 +++++++++++++++++++++++++++++++++++++++++ src/indexer/mod.rs | 2 + src/lib.rs | 2 + 3 files changed, 343 insertions(+) create mode 100644 src/indexer/demuxer.rs diff --git a/src/indexer/demuxer.rs b/src/indexer/demuxer.rs new file mode 100644 index 0000000000..fb75efaff6 --- /dev/null +++ b/src/indexer/demuxer.rs @@ -0,0 +1,339 @@ +use common::BitSet; +use itertools::Itertools; + +use crate::fastfield::DeleteBitSet; +use crate::{merge_filtered_segments, Directory, Index, IndexSettings, Segment, SegmentOrdinal}; + +/// DemuxMapping can be used to reorganize data from multiple segments. +/// e.g. if you have two tenant ids TENANT_A and TENANT_B and two segments with +/// the documents (simplified) +/// Seg 1 [TENANT_A, TENANT_B] +/// Seg 2 [TENANT_A, TENANT_B] +/// +/// You may want to group your documents to +/// Seg 1 [TENANT_A, TENANT_A] +/// Seg 2 [TENANT_B, TENANT_B] +/// +/// Demuxing is the tool for that. +/// Semantically you can define a mapping from [old segment ordinal, old docid] -> [new segment ordinal]. +#[derive(Debug, Default)] +pub struct DemuxMapping { + /// [index old segment ordinal] -> [index docid] = new segment ordinal + mapping: Vec, +} + +/// DocidToSegmentOrdinal maps from docid within a segment to the new segment ordinal for demuxing. +#[derive(Debug, Default)] +pub struct DocidToSegmentOrdinal { + docid_index_to_segment_ordinal: Vec, +} + +impl DocidToSegmentOrdinal { + /// Creates a new DocidToSegmentOrdinal with size of num_docids. + /// Initially all docids point to segment ordinal 0 and need to be set + /// the via `set` method. + pub fn new(num_docids: usize) -> Self { + let mut vec = vec![]; + vec.resize(num_docids, 0); + + DocidToSegmentOrdinal { + docid_index_to_segment_ordinal: vec, + } + } + + /// Associated a docids with a the new SegmentOrdinal. + pub fn set(&mut self, docid: u32, segment_ordinal: SegmentOrdinal) { + self.docid_index_to_segment_ordinal[docid as usize] = segment_ordinal; + } + + /// Iterates over the new SegmentOrdinal in the order of the docid. + pub fn iter(&self) -> impl Iterator { + self.docid_index_to_segment_ordinal.iter() + } +} + +impl DemuxMapping { + /// Creates a new empty DemuxMapping. + pub fn empty() -> Self { + DemuxMapping { + mapping: Default::default(), + } + } + + /// Creates a DemuxMapping from existing mapping data. + pub fn new(mapping: Vec) -> Self { + DemuxMapping { mapping } + } + + /// Adds a DocidToSegmentOrdinal. The order of the pus calls + /// defines the old segment ordinal. e.g. first push = ordinal 0. + pub fn add(&mut self, segment_mapping: DocidToSegmentOrdinal) { + self.mapping.push(segment_mapping); + } + + /// Returns the old number of segments. + pub fn get_old_num_segments(&self) -> usize { + self.mapping.len() + } +} + +fn get_delete_bitsets( + demux_mapping: &DemuxMapping, + target_segment_ordinal: SegmentOrdinal, + max_value_per_segment: &[u32], +) -> Vec { + let mut bitsets: Vec<_> = max_value_per_segment + .iter() + .map(|max_value| BitSet::with_max_value(*max_value)) + .collect(); + + for (old_segment_ordinal, docid_to_new_segment) in demux_mapping.mapping.iter().enumerate() { + let bitset_for_segment = &mut bitsets[old_segment_ordinal]; + for docid in docid_to_new_segment + .iter() + .enumerate() + .filter(|(_docid, new_segment_ordinal)| **new_segment_ordinal != target_segment_ordinal) + .map(|(docid, _)| docid) + { + // mark document as deleted if segment ordinal is not target segment ordinal + bitset_for_segment.insert(docid as u32); + } + } + + bitsets + .iter() + .map(|bitset| DeleteBitSet::from_bitset(bitset, bitset.max_value())) + .collect_vec() +} + +/// Demux the segments according to `demux_mapping`. See `DemuxMapping`. +/// The number of output_directories need to match max new segment ordinal from `demux_mapping`. +/// +/// The ordinal of `segments` need to match the ordinals in `demux_mapping`. +pub fn demux( + segments: &[Segment], + demux_mapping: &DemuxMapping, + target_settings: IndexSettings, + mut output_directories: Vec, +) -> crate::Result> { + output_directories.reverse(); + let max_value_per_segment = segments + .iter() + .map(|seg| seg.meta().max_doc()) + .collect_vec(); + + let mut indices = vec![]; + for target_segment_ordinal in 0..output_directories.len() { + let delete_bitsets = get_delete_bitsets( + demux_mapping, + target_segment_ordinal as u32, + &max_value_per_segment, + ) + .into_iter() + .map(|bitset| Some(bitset)) + .collect_vec(); + + let index = merge_filtered_segments( + segments, + target_settings.clone(), + delete_bitsets, + output_directories + .pop() + .expect("no enough output_directories provided"), + )?; + indices.push(index); + } + Ok(indices) +} + +#[cfg(test)] +mod tests { + use crate::{ + collector::TopDocs, + directory::RamDirectory, + query::QueryParser, + schema::{Schema, TEXT}, + DocAddress, Term, + }; + + use super::*; + + #[test] + fn demux_map_to_deletebitset_test() { + let max_value = 2; + let mut demux_mapping = DemuxMapping::default(); + //segment ordinal 0 mapping + let mut docid_to_segment = DocidToSegmentOrdinal::new(max_value); + docid_to_segment.set(0, 1); + docid_to_segment.set(1, 0); + demux_mapping.add(docid_to_segment); + + //segment ordinal 1 mapping + let mut docid_to_segment = DocidToSegmentOrdinal::new(max_value); + docid_to_segment.set(0, 1); + docid_to_segment.set(1, 1); + demux_mapping.add(docid_to_segment); + { + let bit_sets_for_demuxing_to_segment_ordinal_0 = + get_delete_bitsets(&demux_mapping, 0, &[max_value as u32, max_value as u32]); + + assert_eq!( + bit_sets_for_demuxing_to_segment_ordinal_0[0].is_deleted(0), + true + ); + assert_eq!( + bit_sets_for_demuxing_to_segment_ordinal_0[0].is_deleted(1), + false + ); + assert_eq!( + bit_sets_for_demuxing_to_segment_ordinal_0[1].is_deleted(0), + true + ); + assert_eq!( + bit_sets_for_demuxing_to_segment_ordinal_0[1].is_deleted(1), + true + ); + } + + { + let bit_sets_for_demuxing_to_segment_ordinal_1 = + get_delete_bitsets(&demux_mapping, 1, &[max_value as u32, max_value as u32]); + + assert_eq!( + bit_sets_for_demuxing_to_segment_ordinal_1[0].is_deleted(0), + false + ); + assert_eq!( + bit_sets_for_demuxing_to_segment_ordinal_1[0].is_deleted(1), + true + ); + assert_eq!( + bit_sets_for_demuxing_to_segment_ordinal_1[1].is_deleted(0), + false + ); + assert_eq!( + bit_sets_for_demuxing_to_segment_ordinal_1[1].is_deleted(1), + false + ); + } + } + + #[test] + fn test_demux_segments() -> crate::Result<()> { + let first_index = { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests()?; + index_writer.add_document(doc!(text_field=>"texto1")); + index_writer.add_document(doc!(text_field=>"texto2")); + index_writer.commit()?; + index + }; + + let second_index = { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests()?; + index_writer.add_document(doc!(text_field=>"texto3")); + index_writer.add_document(doc!(text_field=>"texto4")); + index_writer.delete_term(Term::from_field_text(text_field, "4")); + + index_writer.commit()?; + index + }; + + let mut segments: Vec = Vec::new(); + segments.extend(first_index.searchable_segments()?); + segments.extend(second_index.searchable_segments()?); + + let target_settings = first_index.settings().clone(); + + let mut demux_mapping = DemuxMapping::default(); + { + let max_value = 2; + //segment ordinal 0 mapping + let mut docid_to_segment = DocidToSegmentOrdinal::new(max_value); + docid_to_segment.set(0, 1); + docid_to_segment.set(1, 0); + demux_mapping.add(docid_to_segment); + + //segment ordinal 1 mapping + let mut docid_to_segment = DocidToSegmentOrdinal::new(max_value); + docid_to_segment.set(0, 1); + docid_to_segment.set(1, 1); + demux_mapping.add(docid_to_segment); + } + assert_eq!(demux_mapping.get_old_num_segments(), 2); + + let demuxed_indices = demux( + &segments, + &demux_mapping, + target_settings, + vec![RamDirectory::default(), RamDirectory::default()], + )?; + + { + let index = &demuxed_indices[0]; + + let segments = index.searchable_segments()?; + assert_eq!(segments.len(), 1); + + let segment_metas = segments[0].meta(); + assert_eq!(segment_metas.num_deleted_docs(), 0); + assert_eq!(segment_metas.num_docs(), 1); + + let searcher = index.reader().unwrap().searcher(); + { + let text_field = index.schema().get_field("text").unwrap(); + + let do_search = |term: &str| { + let query = QueryParser::for_index(&index, vec![text_field]) + .parse_query(term) + .unwrap(); + let top_docs: Vec<(f32, DocAddress)> = + searcher.search(&query, &TopDocs::with_limit(3)).unwrap(); + + top_docs.iter().map(|el| el.1.doc_id).collect::>() + }; + + assert_eq!(do_search("texto1"), vec![] as Vec); + assert_eq!(do_search("texto2"), vec![0]); + } + } + + { + let index = &demuxed_indices[1]; + + let segments = index.searchable_segments()?; + assert_eq!(segments.len(), 1); + + let segment_metas = segments[0].meta(); + assert_eq!(segment_metas.num_deleted_docs(), 0); + assert_eq!(segment_metas.num_docs(), 3); + + let searcher = index.reader().unwrap().searcher(); + { + let text_field = index.schema().get_field("text").unwrap(); + + let do_search = |term: &str| { + let query = QueryParser::for_index(&index, vec![text_field]) + .parse_query(term) + .unwrap(); + let top_docs: Vec<(f32, DocAddress)> = + searcher.search(&query, &TopDocs::with_limit(3)).unwrap(); + + top_docs.iter().map(|el| el.1.doc_id).collect::>() + }; + + assert_eq!(do_search("texto1"), vec![0]); + assert_eq!(do_search("texto2"), vec![] as Vec); + assert_eq!(do_search("texto3"), vec![1]); + assert_eq!(do_search("texto4"), vec![2]); + } + } + + Ok(()) + } +} diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index fb9770cebd..6f5ecdca96 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,5 +1,6 @@ pub mod delete_queue; +pub mod demuxer; pub mod doc_id_mapping; mod doc_opstamp_mapping; pub mod index_writer; @@ -26,6 +27,7 @@ pub use self::prepared_commit::PreparedCommit; pub use self::segment_entry::SegmentEntry; pub use self::segment_manager::SegmentManager; pub use self::segment_serializer::SegmentSerializer; +pub use self::segment_updater::merge_filtered_segments; pub use self::segment_updater::merge_indices; pub use self::segment_writer::SegmentWriter; diff --git a/src/lib.rs b/src/lib.rs index 33477784ea..d59b088af0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -169,6 +169,8 @@ pub use crate::core::{ }; pub use crate::core::{InvertedIndexReader, SegmentReader}; pub use crate::directory::Directory; +pub use crate::indexer::demuxer::*; +pub use crate::indexer::merge_filtered_segments; pub use crate::indexer::merge_indices; pub use crate::indexer::operation::UserOperation; pub use crate::indexer::IndexWriter; From e35d0c7de10cfc4160f4122f529b2cf740e8e9f1 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 7 Sep 2021 13:35:18 +0200 Subject: [PATCH 4/9] return error instead panic --- src/indexer/demuxer.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/indexer/demuxer.rs b/src/indexer/demuxer.rs index fb75efaff6..d33ee9d2a5 100644 --- a/src/indexer/demuxer.rs +++ b/src/indexer/demuxer.rs @@ -2,7 +2,9 @@ use common::BitSet; use itertools::Itertools; use crate::fastfield::DeleteBitSet; -use crate::{merge_filtered_segments, Directory, Index, IndexSettings, Segment, SegmentOrdinal}; +use crate::{ + merge_filtered_segments, Directory, Index, IndexSettings, Segment, SegmentOrdinal, TantivyError, +}; /// DemuxMapping can be used to reorganize data from multiple segments. /// e.g. if you have two tenant ids TENANT_A and TENANT_B and two segments with @@ -137,9 +139,9 @@ pub fn demux( segments, target_settings.clone(), delete_bitsets, - output_directories - .pop() - .expect("no enough output_directories provided"), + output_directories.pop().ok_or_else(|| { + TantivyError::InvalidArgument("not enough output_directories provided".to_string()) + })?, )?; indices.push(index); } From 8894f7d872d55423637ba8fd4172d02f313fd8d1 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 24 Sep 2021 19:17:02 +0800 Subject: [PATCH 5/9] update comments --- src/indexer/demuxer.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/indexer/demuxer.rs b/src/indexer/demuxer.rs index d33ee9d2a5..733fd5cd20 100644 --- a/src/indexer/demuxer.rs +++ b/src/indexer/demuxer.rs @@ -25,6 +25,8 @@ pub struct DemuxMapping { } /// DocidToSegmentOrdinal maps from docid within a segment to the new segment ordinal for demuxing. +/// +/// For every source segment there is a `DocidToSegmentOrdinal` to distribute its docids. #[derive(Debug, Default)] pub struct DocidToSegmentOrdinal { docid_index_to_segment_ordinal: Vec, @@ -111,7 +113,7 @@ fn get_delete_bitsets( /// Demux the segments according to `demux_mapping`. See `DemuxMapping`. /// The number of output_directories need to match max new segment ordinal from `demux_mapping`. /// -/// The ordinal of `segments` need to match the ordinals in `demux_mapping`. +/// The ordinal of `segments` need to match the ordinals provided in `demux_mapping`. pub fn demux( segments: &[Segment], demux_mapping: &DemuxMapping, From 0bfad6dc88cad8a2e5361f31e7fa0533bd83fb6d Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Sat, 25 Sep 2021 15:39:09 +0800 Subject: [PATCH 6/9] use tinysets merging --- common/src/bitset.rs | 55 ++++++++++++++++++++++++++++- src/core/segment_reader.rs | 16 +++++---- src/fastfield/alive_bitset.rs | 32 ++++++++++++++++- src/fastfield/mod.rs | 2 +- src/indexer/demuxer.rs | 10 +++--- src/indexer/index_writer.rs | 9 +---- src/indexer/merger.rs | 14 ++++---- src/indexer/segment_updater.rs | 64 ++++++++++++++++++++++++++++------ 8 files changed, 162 insertions(+), 40 deletions(-) diff --git a/common/src/bitset.rs b/common/src/bitset.rs index 5192c502d3..f2b0ad7ffa 100644 --- a/common/src/bitset.rs +++ b/common/src/bitset.rs @@ -192,7 +192,6 @@ impl BitSet { /// Deserialize a `BitSet`. /// - #[cfg(test)] pub fn deserialize(mut data: &[u8]) -> io::Result { let max_value: u32 = u32::from_le_bytes(data[..4].try_into().unwrap()); data = &data[4..]; @@ -247,6 +246,20 @@ impl BitSet { } } + /// Intersect with serialized bitset + pub fn intersect_with(&mut self, other: &ReadSerializedBitSet) { + self.intersect_with_iter(other.iter_tinysets()); + } + + /// Intersect with tinysets + pub fn intersect_with_iter(&mut self, other: impl Iterator) { + self.len = 0; + for (left, right) in self.tinysets.iter_mut().zip(other) { + *left = left.intersect(right); + self.len += left.len() as u64; + } + } + /// Returns the number of elements in the `BitSet`. #[inline] pub fn len(&self) -> usize { @@ -403,6 +416,46 @@ mod tests { assert_eq!(bitset.len(), 4); } + #[test] + fn test_bitset_intersect() { + let bitset_serialized = { + let mut bitset = BitSet::with_max_value_and_full(5); + bitset.remove(1); + bitset.remove(3); + let mut out = vec![]; + bitset.serialize(&mut out).unwrap(); + + ReadSerializedBitSet::open(OwnedBytes::new(out)) + }; + + let mut bitset = BitSet::with_max_value_and_full(5); + bitset.remove(1); + bitset.intersect_with(&bitset_serialized); + + assert!(bitset.contains(0)); + assert!(!bitset.contains(1)); + assert!(bitset.contains(2)); + assert!(!bitset.contains(3)); + assert!(bitset.contains(4)); + + bitset.intersect_with_iter(vec![TinySet::singleton(0)].into_iter()); + + assert!(bitset.contains(0)); + assert!(!bitset.contains(1)); + assert!(!bitset.contains(2)); + assert!(!bitset.contains(3)); + assert!(!bitset.contains(4)); + assert_eq!(bitset.len(), 1); + + bitset.intersect_with_iter(vec![TinySet::singleton(1)].into_iter()); + assert!(!bitset.contains(0)); + assert!(!bitset.contains(1)); + assert!(!bitset.contains(2)); + assert!(!bitset.contains(3)); + assert!(!bitset.contains(4)); + assert_eq!(bitset.len(), 0); + } + #[test] fn test_read_serialized_bitset_empty() { let mut bitset = BitSet::with_max_value(5); diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 4c6123b000..f22e136395 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -5,7 +5,7 @@ use crate::core::SegmentId; use crate::directory::CompositeFile; use crate::directory::FileSlice; use crate::error::DataCorruption; -use crate::fastfield::merge_delete_bitset; +use crate::fastfield::merge_alive_bitset; use crate::fastfield::AliveBitSet; use crate::fastfield::FacetReader; use crate::fastfield::FastFieldReaders; @@ -71,14 +71,16 @@ impl SegmentReader { } /// Merges the passed bitset with the existing one. - pub fn apply_delete_bitset(&mut self, delete_bitset: DeleteBitSet) { - if let Some(existing_bitset) = self.delete_bitset_opt.as_mut() { - let merged_bitset = merge_delete_bitset(&delete_bitset, existing_bitset); - self.delete_bitset_opt = Some(merged_bitset); + pub fn apply_alive_bitset(&mut self, alive_bitset: AliveBitSet) -> crate::Result<()> { + if let Some(existing_bitset) = self.alive_bitset_opt.as_mut() { + let merged_bitset = merge_alive_bitset(&alive_bitset, existing_bitset)?; + self.num_docs = merged_bitset.num_alive_docs() as u32; + self.alive_bitset_opt = Some(merged_bitset); } else { - self.delete_bitset_opt = Some(delete_bitset); + self.num_docs = alive_bitset.num_alive_docs() as u32; + self.alive_bitset_opt = Some(alive_bitset); } - self.num_docs = self.max_doc - self.num_deleted_docs(); + Ok(()) } /// Return the number of documents that have been diff --git a/src/fastfield/alive_bitset.rs b/src/fastfield/alive_bitset.rs index 8cde895f7c..c359a2cefa 100644 --- a/src/fastfield/alive_bitset.rs +++ b/src/fastfield/alive_bitset.rs @@ -6,6 +6,22 @@ use ownedbytes::OwnedBytes; use std::io; use std::io::Write; +/// Merges (intersects) two AliveBitSet in a new one. +pub fn merge_alive_bitset(left: &AliveBitSet, right: &AliveBitSet) -> crate::Result { + let (mut merged_bitset, other) = if left.space_usage() > right.space_usage() { + (BitSet::deserialize(left.data().as_slice())?, right) + } else { + (BitSet::deserialize(right.data().as_slice())?, left) + }; + + merged_bitset.intersect_with(other.bitset()); + + let mut out = vec![]; + write_alive_bitset(&merged_bitset, &mut out)?; + + Ok(AliveBitSet::open(OwnedBytes::new(out))) +} + /// Write a alive `BitSet` /// /// where `alive_bitset` is the set of alive `DocId`. @@ -22,6 +38,7 @@ pub struct AliveBitSet { num_alive_docs: usize, bitset: ReadSerializedBitSet, num_bytes: ByteCount, + data: OwnedBytes, } impl AliveBitSet { @@ -38,14 +55,22 @@ impl AliveBitSet { Self::open(alive_bitset_bytes) } + pub(crate) fn from_bitset(bitset: &BitSet) -> AliveBitSet { + let mut out = vec![]; + write_alive_bitset(&bitset, &mut out).unwrap(); + + AliveBitSet::open(OwnedBytes::new(out)) + } + /// Opens a delete bitset given its file. pub fn open(bytes: OwnedBytes) -> AliveBitSet { let num_bytes = bytes.len(); - let bitset = ReadSerializedBitSet::open(bytes); + let bitset = ReadSerializedBitSet::open(bytes.clone()); AliveBitSet { num_alive_docs: bitset.len(), bitset, num_bytes, + data: bytes, } } @@ -82,6 +107,11 @@ impl AliveBitSet { pub fn space_usage(&self) -> ByteCount { self.num_bytes } + + /// Get underlying bytes. + pub(crate) fn data(&self) -> OwnedBytes { + self.data.clone() + } } #[cfg(test)] diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 8cce202950..3a28993bac 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -23,10 +23,10 @@ values stored. Read access performance is comparable to that of an array lookup. */ +pub use self::alive_bitset::merge_alive_bitset; pub use self::alive_bitset::write_alive_bitset; pub use self::alive_bitset::AliveBitSet; pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter}; -pub use self::delete::merge_delete_bitset; pub use self::error::{FastFieldNotAvailableError, Result}; pub use self::facet_reader::FacetReader; pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter}; diff --git a/src/indexer/demuxer.rs b/src/indexer/demuxer.rs index 733fd5cd20..4d507e2e63 100644 --- a/src/indexer/demuxer.rs +++ b/src/indexer/demuxer.rs @@ -1,7 +1,7 @@ use common::BitSet; use itertools::Itertools; -use crate::fastfield::DeleteBitSet; +use crate::fastfield::AliveBitSet; use crate::{ merge_filtered_segments, Directory, Index, IndexSettings, Segment, SegmentOrdinal, TantivyError, }; @@ -85,10 +85,10 @@ fn get_delete_bitsets( demux_mapping: &DemuxMapping, target_segment_ordinal: SegmentOrdinal, max_value_per_segment: &[u32], -) -> Vec { +) -> Vec { let mut bitsets: Vec<_> = max_value_per_segment .iter() - .map(|max_value| BitSet::with_max_value(*max_value)) + .map(|max_value| BitSet::with_max_value_and_full(*max_value)) .collect(); for (old_segment_ordinal, docid_to_new_segment) in demux_mapping.mapping.iter().enumerate() { @@ -100,13 +100,13 @@ fn get_delete_bitsets( .map(|(docid, _)| docid) { // mark document as deleted if segment ordinal is not target segment ordinal - bitset_for_segment.insert(docid as u32); + bitset_for_segment.remove(docid as u32); } } bitsets .iter() - .map(|bitset| DeleteBitSet::from_bitset(bitset, bitset.max_value())) + .map(|bitset| AliveBitSet::from_bitset(bitset)) .collect_vec() } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index cef8380e65..34c213b367 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -164,15 +164,8 @@ pub(crate) fn advance_deletes( target_opstamp, )?; - // TODO optimize - // It should be possible to do something smarter by manipulation bitsets directly - // to compute this union. if let Some(seg_alive_bitset) = segment_reader.alive_bitset() { - for doc in 0u32..max_doc { - if seg_alive_bitset.is_deleted(doc) { - alive_bitset.remove(doc); - } - } + alive_bitset.intersect_with(seg_alive_bitset.bitset()); } let num_alive_docs: u32 = alive_bitset.len() as u32; diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 30a9dd7d7a..5e6c5d05ab 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,4 +1,5 @@ use crate::error::DataCorruption; +use crate::fastfield::AliveBitSet; use crate::fastfield::CompositeFastFieldSerializer; use crate::fastfield::DynamicFastFieldReader; use crate::fastfield::FastFieldDataAccess; @@ -158,7 +159,7 @@ impl IndexMerger { segments: &[Segment], ) -> crate::Result { let delete_bitsets = segments.iter().map(|_| None).collect_vec(); - Self::open_with_custom_delete_set(schema, index_settings, segments, delete_bitsets) + Self::open_with_custom_alive_set(schema, index_settings, segments, delete_bitsets) } // Create merge with a custom delete set. @@ -169,11 +170,11 @@ impl IndexMerger { // This can be used to merge but also apply an additional filter. // One use case is demux, which is basically taking a list of // segments and partitions them e.g. by a value in a field. - pub fn open_with_custom_delete_set( + pub fn open_with_custom_alive_set( schema: Schema, index_settings: IndexSettings, segments: &[Segment], - delete_bitset_opt: Vec>, + alive_bitset_opt: Vec>, ) -> crate::Result { let mut readers = vec![]; let mut max_doc: u32 = 0u32; @@ -183,10 +184,9 @@ impl IndexMerger { readers.push(reader); } } - for (reader, new_delete_bitset_opt) in readers.iter_mut().zip(delete_bitset_opt.into_iter()) - { - if let Some(new_delete_bitset) = new_delete_bitset_opt { - reader.apply_delete_bitset(new_delete_bitset); + for (reader, new_alive_bitset_opt) in readers.iter_mut().zip(alive_bitset_opt.into_iter()) { + if let Some(new_alive_bitset) = new_alive_bitset_opt { + reader.apply_alive_bitset(new_alive_bitset)?; } max_doc += reader.num_docs(); } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index b47b6dc9f2..b7d8328f55 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -7,7 +7,7 @@ use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::core::META_FILEPATH; use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult}; -use crate::fastfield::DeleteBitSet; +use crate::fastfield::AliveBitSet; use crate::indexer::delete_queue::DeleteCursor; use crate::indexer::index_writer::advance_deletes; use crate::indexer::merge_operation::MergeOperationInventory; @@ -209,7 +209,7 @@ pub fn merge_indices( pub fn merge_filtered_segments( segments: &[Segment], target_settings: IndexSettings, - filter_docids: Vec>, + filter_docids: Vec>, output_directory: Dir, ) -> crate::Result { if segments.is_empty() { @@ -239,7 +239,7 @@ pub fn merge_filtered_segments( )?; let merged_segment = merged_index.new_segment(); let merged_segment_id = merged_segment.id(); - let merger: IndexMerger = IndexMerger::open_with_custom_delete_set( + let merger: IndexMerger = IndexMerger::open_with_custom_alive_set( merged_index.schema(), merged_index.settings().clone(), &segments[..], @@ -687,7 +687,7 @@ mod tests { use super::merge_indices; use crate::collector::TopDocs; use crate::directory::RamDirectory; - use crate::fastfield::DeleteBitSet; + use crate::fastfield::AliveBitSet; use crate::indexer::merge_policy::tests::MergeWheneverPossible; use crate::indexer::merger::IndexMerger; use crate::indexer::segment_updater::merge_filtered_segments; @@ -738,6 +738,50 @@ mod tests { Ok(()) } + #[test] + fn delete_all_docs_min() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + + // writing the segment + let mut index_writer = index.writer_for_tests()?; + + for _ in 0..10 { + index_writer.add_document(doc!(text_field=>"a")); + index_writer.add_document(doc!(text_field=>"b")); + } + index_writer.commit()?; + + let seg_ids = index.searchable_segment_ids()?; + // docs exist, should have at least 1 segment + assert!(!seg_ids.is_empty()); + + let term = Term::from_field_text(text_field, "a"); + index_writer.delete_term(term); + index_writer.commit()?; + + let term = Term::from_field_text(text_field, "b"); + index_writer.delete_term(term); + index_writer.commit()?; + + index_writer.wait_merging_threads()?; + + let reader = index.reader()?; + assert_eq!(reader.searcher().num_docs(), 0); + + let seg_ids = index.searchable_segment_ids()?; + assert!(seg_ids.is_empty()); + + reader.reload()?; + assert_eq!(reader.searcher().num_docs(), 0); + // empty segments should be erased + assert!(index.searchable_segment_metas()?.is_empty()); + assert!(reader.searcher().segment_readers().is_empty()); + + Ok(()) + } + #[test] fn delete_all_docs() -> crate::Result<()> { let mut schema_builder = Schema::builder(); @@ -921,8 +965,8 @@ mod tests { let target_settings = first_index.settings().clone(); - let filter_segment_1 = DeleteBitSet::for_test(&[1], 2); - let filter_segment_2 = DeleteBitSet::for_test(&[0], 2); + let filter_segment_1 = AliveBitSet::for_test_from_deleted_docs(&[1], 2); + let filter_segment_2 = AliveBitSet::for_test_from_deleted_docs(&[0], 2); let filter_segments = vec![Some(filter_segment_1), Some(filter_segment_2)]; @@ -967,7 +1011,7 @@ mod tests { let target_settings = first_index.settings().clone(); - let filter_segment = DeleteBitSet::for_test(&[0], 4); + let filter_segment = AliveBitSet::for_test_from_deleted_docs(&[0], 4); let filter_segments = vec![Some(filter_segment)]; @@ -1030,7 +1074,7 @@ mod tests { let target_settings = first_index.settings().clone(); { - let filter_segment = DeleteBitSet::for_test(&[1], 4); + let filter_segment = AliveBitSet::for_test_from_deleted_docs(&[1], 4); let filter_segments = vec![Some(filter_segment)]; let target_schema = segments[0].schema(); let merged_index = Index::create( @@ -1038,7 +1082,7 @@ mod tests { target_schema.clone(), target_settings.clone(), )?; - let merger: IndexMerger = IndexMerger::open_with_custom_delete_set( + let merger: IndexMerger = IndexMerger::open_with_custom_alive_set( merged_index.schema(), merged_index.settings().clone(), &segments[..], @@ -1057,7 +1101,7 @@ mod tests { target_schema.clone(), target_settings.clone(), )?; - let merger: IndexMerger = IndexMerger::open_with_custom_delete_set( + let merger: IndexMerger = IndexMerger::open_with_custom_alive_set( merged_index.schema(), merged_index.settings().clone(), &segments[..], From 7facbf7243e986a0c4912f3e510c786bfc5ba39f Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 6 Oct 2021 09:40:33 +0800 Subject: [PATCH 7/9] renames etc --- src/core/segment_reader.rs | 36 +++++++++++++++++++++++++++-------- src/fastfield/alive_bitset.rs | 18 ++++++++---------- src/fastfield/mod.rs | 2 +- src/indexer/demuxer.rs | 29 +++++++++++++++------------- src/indexer/merger.rs | 13 ++++--------- 5 files changed, 57 insertions(+), 41 deletions(-) diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index f22e136395..053c55d188 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -5,7 +5,7 @@ use crate::core::SegmentId; use crate::directory::CompositeFile; use crate::directory::FileSlice; use crate::error::DataCorruption; -use crate::fastfield::merge_alive_bitset; +use crate::fastfield::union_alive_bitset; use crate::fastfield::AliveBitSet; use crate::fastfield::FacetReader; use crate::fastfield::FastFieldReaders; @@ -73,7 +73,7 @@ impl SegmentReader { /// Merges the passed bitset with the existing one. pub fn apply_alive_bitset(&mut self, alive_bitset: AliveBitSet) -> crate::Result<()> { if let Some(existing_bitset) = self.alive_bitset_opt.as_mut() { - let merged_bitset = merge_alive_bitset(&alive_bitset, existing_bitset)?; + let merged_bitset = union_alive_bitset(&alive_bitset, existing_bitset)?; self.num_docs = merged_bitset.num_alive_docs() as u32; self.alive_bitset_opt = Some(merged_bitset); } else { @@ -154,6 +154,14 @@ impl SegmentReader { /// Open a new segment for reading. pub fn open(segment: &Segment) -> crate::Result { + Self::open_with_custom_alive_set(segment, None) + } + + /// Open a new segment for reading. + pub fn open_with_custom_alive_set( + segment: &Segment, + custom_bitset: Option, + ) -> crate::Result { let termdict_file = segment.open_read(SegmentComponent::Terms)?; let termdict_composite = CompositeFile::open(&termdict_file)?; @@ -178,22 +186,34 @@ impl SegmentReader { let fast_fields_composite = CompositeFile::open(&fast_fields_data)?; let fast_field_readers = Arc::new(FastFieldReaders::new(schema.clone(), fast_fields_composite)); - let fieldnorm_data = segment.open_read(SegmentComponent::FieldNorms)?; let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?; + let mut num_docs = segment.meta().num_docs(); + let max_doc = segment.meta().max_doc(); + let alive_bitset_opt = if segment.meta().has_deletes() { - let alive_bitset_bytes = segment.open_read(SegmentComponent::Delete)?.read_bytes()?; - let alive_bitset = AliveBitSet::open(alive_bitset_bytes); + let delete_data = segment.open_read(SegmentComponent::Delete)?; + let mut alive_bitset = AliveBitSet::open(delete_data.read_bytes()?); + + if let Some(provided_bitset) = custom_bitset { + alive_bitset = union_alive_bitset(&alive_bitset, &provided_bitset)?; + num_docs = alive_bitset.num_alive_docs() as u32; + } Some(alive_bitset) } else { - None + if let Some(provided_bitset) = custom_bitset { + num_docs = provided_bitset.num_alive_docs() as u32; + Some(provided_bitset) + } else { + None + } }; Ok(SegmentReader { inv_idx_reader_cache: Default::default(), - max_doc: segment.meta().max_doc(), - num_docs: segment.meta().num_docs(), + num_docs, + max_doc, termdict_composite, postings_composite, fast_fields_readers: fast_field_readers, diff --git a/src/fastfield/alive_bitset.rs b/src/fastfield/alive_bitset.rs index c359a2cefa..f8befc17b4 100644 --- a/src/fastfield/alive_bitset.rs +++ b/src/fastfield/alive_bitset.rs @@ -7,19 +7,17 @@ use std::io; use std::io::Write; /// Merges (intersects) two AliveBitSet in a new one. -pub fn merge_alive_bitset(left: &AliveBitSet, right: &AliveBitSet) -> crate::Result { - let (mut merged_bitset, other) = if left.space_usage() > right.space_usage() { - (BitSet::deserialize(left.data().as_slice())?, right) - } else { - (BitSet::deserialize(right.data().as_slice())?, left) - }; +/// The two bitsets need to have the same max_value. +pub fn union_alive_bitset(left: &AliveBitSet, right: &AliveBitSet) -> crate::Result { + assert_eq!(left.bitset().max_value(), right.bitset().max_value()); - merged_bitset.intersect_with(other.bitset()); + let mut merged_bitset = BitSet::deserialize(left.data().as_slice())?; + merged_bitset.intersect_with(right.bitset()); - let mut out = vec![]; - write_alive_bitset(&merged_bitset, &mut out)?; + let mut alive_bitset_buffer = vec![]; + write_alive_bitset(&merged_bitset, &mut alive_bitset_buffer)?; - Ok(AliveBitSet::open(OwnedBytes::new(out))) + Ok(AliveBitSet::open(OwnedBytes::new(alive_bitset_buffer))) } /// Write a alive `BitSet` diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 3a28993bac..640c886609 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -23,7 +23,7 @@ values stored. Read access performance is comparable to that of an array lookup. */ -pub use self::alive_bitset::merge_alive_bitset; +pub use self::alive_bitset::union_alive_bitset; pub use self::alive_bitset::write_alive_bitset; pub use self::alive_bitset::AliveBitSet; pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter}; diff --git a/src/indexer/demuxer.rs b/src/indexer/demuxer.rs index 4d507e2e63..3d92f768b0 100644 --- a/src/indexer/demuxer.rs +++ b/src/indexer/demuxer.rs @@ -5,8 +5,11 @@ use crate::fastfield::AliveBitSet; use crate::{ merge_filtered_segments, Directory, Index, IndexSettings, Segment, SegmentOrdinal, TantivyError, }; - /// DemuxMapping can be used to reorganize data from multiple segments. +/// +/// DemuxMapping is useful in a multitenant settings, in which each document might actually belong to a different tenant. +/// It allows to reorganize documents as follows: +/// /// e.g. if you have two tenant ids TENANT_A and TENANT_B and two segments with /// the documents (simplified) /// Seg 1 [TENANT_A, TENANT_B] @@ -36,7 +39,7 @@ impl DocidToSegmentOrdinal { /// Creates a new DocidToSegmentOrdinal with size of num_docids. /// Initially all docids point to segment ordinal 0 and need to be set /// the via `set` method. - pub fn new(num_docids: usize) -> Self { + pub fn with_num_docs(num_docids: usize) -> Self { let mut vec = vec![]; vec.resize(num_docids, 0); @@ -51,8 +54,8 @@ impl DocidToSegmentOrdinal { } /// Iterates over the new SegmentOrdinal in the order of the docid. - pub fn iter(&self) -> impl Iterator { - self.docid_index_to_segment_ordinal.iter() + pub fn iter(&self) -> impl Iterator + '_ { + self.docid_index_to_segment_ordinal.iter().cloned() } } @@ -81,7 +84,7 @@ impl DemuxMapping { } } -fn get_delete_bitsets( +fn get_alive_bitsets( demux_mapping: &DemuxMapping, target_segment_ordinal: SegmentOrdinal, max_value_per_segment: &[u32], @@ -96,7 +99,7 @@ fn get_delete_bitsets( for docid in docid_to_new_segment .iter() .enumerate() - .filter(|(_docid, new_segment_ordinal)| **new_segment_ordinal != target_segment_ordinal) + .filter(|(_docid, new_segment_ordinal)| *new_segment_ordinal != target_segment_ordinal) .map(|(docid, _)| docid) { // mark document as deleted if segment ordinal is not target segment ordinal @@ -128,7 +131,7 @@ pub fn demux( let mut indices = vec![]; for target_segment_ordinal in 0..output_directories.len() { - let delete_bitsets = get_delete_bitsets( + let delete_bitsets = get_alive_bitsets( demux_mapping, target_segment_ordinal as u32, &max_value_per_segment, @@ -167,19 +170,19 @@ mod tests { let max_value = 2; let mut demux_mapping = DemuxMapping::default(); //segment ordinal 0 mapping - let mut docid_to_segment = DocidToSegmentOrdinal::new(max_value); + let mut docid_to_segment = DocidToSegmentOrdinal::with_num_docs(max_value); docid_to_segment.set(0, 1); docid_to_segment.set(1, 0); demux_mapping.add(docid_to_segment); //segment ordinal 1 mapping - let mut docid_to_segment = DocidToSegmentOrdinal::new(max_value); + let mut docid_to_segment = DocidToSegmentOrdinal::with_num_docs(max_value); docid_to_segment.set(0, 1); docid_to_segment.set(1, 1); demux_mapping.add(docid_to_segment); { let bit_sets_for_demuxing_to_segment_ordinal_0 = - get_delete_bitsets(&demux_mapping, 0, &[max_value as u32, max_value as u32]); + get_alive_bitsets(&demux_mapping, 0, &[max_value as u32, max_value as u32]); assert_eq!( bit_sets_for_demuxing_to_segment_ordinal_0[0].is_deleted(0), @@ -201,7 +204,7 @@ mod tests { { let bit_sets_for_demuxing_to_segment_ordinal_1 = - get_delete_bitsets(&demux_mapping, 1, &[max_value as u32, max_value as u32]); + get_alive_bitsets(&demux_mapping, 1, &[max_value as u32, max_value as u32]); assert_eq!( bit_sets_for_demuxing_to_segment_ordinal_1[0].is_deleted(0), @@ -258,13 +261,13 @@ mod tests { { let max_value = 2; //segment ordinal 0 mapping - let mut docid_to_segment = DocidToSegmentOrdinal::new(max_value); + let mut docid_to_segment = DocidToSegmentOrdinal::with_num_docs(max_value); docid_to_segment.set(0, 1); docid_to_segment.set(1, 0); demux_mapping.add(docid_to_segment); //segment ordinal 1 mapping - let mut docid_to_segment = DocidToSegmentOrdinal::new(max_value); + let mut docid_to_segment = DocidToSegmentOrdinal::with_num_docs(max_value); docid_to_segment.set(0, 1); docid_to_segment.set(1, 1); demux_mapping.add(docid_to_segment); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 5e6c5d05ab..c2789994f9 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -177,19 +177,14 @@ impl IndexMerger { alive_bitset_opt: Vec>, ) -> crate::Result { let mut readers = vec![]; - let mut max_doc: u32 = 0u32; - for segment in segments { + for (segment, new_alive_bitset_opt) in segments.iter().zip(alive_bitset_opt.into_iter()) { if segment.meta().num_docs() > 0 { - let reader = SegmentReader::open(segment)?; + let reader = + SegmentReader::open_with_custom_alive_set(segment, new_alive_bitset_opt)?; readers.push(reader); } } - for (reader, new_alive_bitset_opt) in readers.iter_mut().zip(alive_bitset_opt.into_iter()) { - if let Some(new_alive_bitset) = new_alive_bitset_opt { - reader.apply_alive_bitset(new_alive_bitset)?; - } - max_doc += reader.num_docs(); - } + let max_doc = readers.iter().map(|reader| reader.num_docs()).sum(); if let Some(sort_by_field) = index_settings.sort_by_field.as_ref() { readers = Self::sort_readers_by_min_sort_field(readers, sort_by_field)?; } From 3e281a079fc236e60328d30e0586844383fa018a Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 6 Oct 2021 10:50:48 +0800 Subject: [PATCH 8/9] renames etc. --- common/src/bitset.rs | 12 ++++++------ src/core/segment_reader.rs | 13 ------------- src/fastfield/alive_bitset.rs | 2 +- src/indexer/demuxer.rs | 19 +++++++------------ src/indexer/index_writer.rs | 2 +- src/indexer/merger.rs | 4 ++++ 6 files changed, 19 insertions(+), 33 deletions(-) diff --git a/common/src/bitset.rs b/common/src/bitset.rs index f2b0ad7ffa..6aa3b357ac 100644 --- a/common/src/bitset.rs +++ b/common/src/bitset.rs @@ -247,12 +247,12 @@ impl BitSet { } /// Intersect with serialized bitset - pub fn intersect_with(&mut self, other: &ReadSerializedBitSet) { - self.intersect_with_iter(other.iter_tinysets()); + pub fn intersect_update(&mut self, other: &ReadSerializedBitSet) { + self.intersect_update_with_iter(other.iter_tinysets()); } /// Intersect with tinysets - pub fn intersect_with_iter(&mut self, other: impl Iterator) { + fn intersect_update_with_iter(&mut self, other: impl Iterator) { self.len = 0; for (left, right) in self.tinysets.iter_mut().zip(other) { *left = left.intersect(right); @@ -430,7 +430,7 @@ mod tests { let mut bitset = BitSet::with_max_value_and_full(5); bitset.remove(1); - bitset.intersect_with(&bitset_serialized); + bitset.intersect_update(&bitset_serialized); assert!(bitset.contains(0)); assert!(!bitset.contains(1)); @@ -438,7 +438,7 @@ mod tests { assert!(!bitset.contains(3)); assert!(bitset.contains(4)); - bitset.intersect_with_iter(vec![TinySet::singleton(0)].into_iter()); + bitset.intersect_update_with_iter(vec![TinySet::singleton(0)].into_iter()); assert!(bitset.contains(0)); assert!(!bitset.contains(1)); @@ -447,7 +447,7 @@ mod tests { assert!(!bitset.contains(4)); assert_eq!(bitset.len(), 1); - bitset.intersect_with_iter(vec![TinySet::singleton(1)].into_iter()); + bitset.intersect_update_with_iter(vec![TinySet::singleton(1)].into_iter()); assert!(!bitset.contains(0)); assert!(!bitset.contains(1)); assert!(!bitset.contains(2)); diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 053c55d188..ce0dd5fc66 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -70,19 +70,6 @@ impl SegmentReader { &self.schema } - /// Merges the passed bitset with the existing one. - pub fn apply_alive_bitset(&mut self, alive_bitset: AliveBitSet) -> crate::Result<()> { - if let Some(existing_bitset) = self.alive_bitset_opt.as_mut() { - let merged_bitset = union_alive_bitset(&alive_bitset, existing_bitset)?; - self.num_docs = merged_bitset.num_alive_docs() as u32; - self.alive_bitset_opt = Some(merged_bitset); - } else { - self.num_docs = alive_bitset.num_alive_docs() as u32; - self.alive_bitset_opt = Some(alive_bitset); - } - Ok(()) - } - /// Return the number of documents that have been /// deleted in the segment. pub fn num_deleted_docs(&self) -> DocId { diff --git a/src/fastfield/alive_bitset.rs b/src/fastfield/alive_bitset.rs index f8befc17b4..8263ce35e2 100644 --- a/src/fastfield/alive_bitset.rs +++ b/src/fastfield/alive_bitset.rs @@ -12,7 +12,7 @@ pub fn union_alive_bitset(left: &AliveBitSet, right: &AliveBitSet) -> crate::Res assert_eq!(left.bitset().max_value(), right.bitset().max_value()); let mut merged_bitset = BitSet::deserialize(left.data().as_slice())?; - merged_bitset.intersect_with(right.bitset()); + merged_bitset.intersect_update(right.bitset()); let mut alive_bitset_buffer = vec![]; write_alive_bitset(&merged_bitset, &mut alive_bitset_buffer)?; diff --git a/src/indexer/demuxer.rs b/src/indexer/demuxer.rs index 3d92f768b0..d4337b1244 100644 --- a/src/indexer/demuxer.rs +++ b/src/indexer/demuxer.rs @@ -2,9 +2,7 @@ use common::BitSet; use itertools::Itertools; use crate::fastfield::AliveBitSet; -use crate::{ - merge_filtered_segments, Directory, Index, IndexSettings, Segment, SegmentOrdinal, TantivyError, -}; +use crate::{merge_filtered_segments, Directory, Index, IndexSettings, Segment, SegmentOrdinal}; /// DemuxMapping can be used to reorganize data from multiple segments. /// /// DemuxMapping is useful in a multitenant settings, in which each document might actually belong to a different tenant. @@ -91,7 +89,7 @@ fn get_alive_bitsets( ) -> Vec { let mut bitsets: Vec<_> = max_value_per_segment .iter() - .map(|max_value| BitSet::with_max_value_and_full(*max_value)) + .map(|max_value| BitSet::with_max_value(*max_value)) .collect(); for (old_segment_ordinal, docid_to_new_segment) in demux_mapping.mapping.iter().enumerate() { @@ -99,11 +97,11 @@ fn get_alive_bitsets( for docid in docid_to_new_segment .iter() .enumerate() - .filter(|(_docid, new_segment_ordinal)| *new_segment_ordinal != target_segment_ordinal) + .filter(|(_docid, new_segment_ordinal)| *new_segment_ordinal == target_segment_ordinal) .map(|(docid, _)| docid) { - // mark document as deleted if segment ordinal is not target segment ordinal - bitset_for_segment.remove(docid as u32); + // add document if segment ordinal = target segment ordinal + bitset_for_segment.insert(docid as u32); } } @@ -123,14 +121,13 @@ pub fn demux( target_settings: IndexSettings, mut output_directories: Vec, ) -> crate::Result> { - output_directories.reverse(); let max_value_per_segment = segments .iter() .map(|seg| seg.meta().max_doc()) .collect_vec(); let mut indices = vec![]; - for target_segment_ordinal in 0..output_directories.len() { + for (target_segment_ordinal, output_directory) in output_directories.into_iter().enumerate() { let delete_bitsets = get_alive_bitsets( demux_mapping, target_segment_ordinal as u32, @@ -144,9 +141,7 @@ pub fn demux( segments, target_settings.clone(), delete_bitsets, - output_directories.pop().ok_or_else(|| { - TantivyError::InvalidArgument("not enough output_directories provided".to_string()) - })?, + output_directory, )?; indices.push(index); } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 34c213b367..cffe4aae13 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -165,7 +165,7 @@ pub(crate) fn advance_deletes( )?; if let Some(seg_alive_bitset) = segment_reader.alive_bitset() { - alive_bitset.intersect_with(seg_alive_bitset.bitset()); + alive_bitset.intersect_update(seg_alive_bitset.bitset()); } let num_alive_docs: u32 = alive_bitset.len() as u32; diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index c2789994f9..4a5dade6df 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -167,6 +167,10 @@ impl IndexMerger { // will be merged with the existing bit set. Make sure the index // corresponds to the segment index. // + // If `None` is provided for custom alive set, the regular alive set will be used. + // If a delete_bitsets is provided, the union between the provided and regular + // alive set will be used. + // // This can be used to merge but also apply an additional filter. // One use case is demux, which is basically taking a list of // segments and partitions them e.g. by a value in a field. From ce11858b6874ee7dca7b678a15c364eaac1e8b9e Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 6 Oct 2021 12:21:37 +0900 Subject: [PATCH 9/9] Minor renaming --- CHANGELOG.md | 1 + common/src/bitset.rs | 2 +- src/core/index.rs | 2 +- src/core/searcher.rs | 2 +- src/core/segment_reader.rs | 1 + src/fastfield/alive_bitset.rs | 5 +- src/indexer/demuxer.rs | 182 +++++++++++------------- src/indexer/doc_id_mapping.rs | 8 +- src/indexer/merger.rs | 72 +++++----- src/indexer/merger_sorted_index_test.rs | 4 +- src/indexer/segment_updater.rs | 16 +-- src/store/mod.rs | 4 +- src/store/reader.rs | 4 +- src/termdict/merger.rs | 2 +- 14 files changed, 144 insertions(+), 161 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bfaa9c7217..d5b9240ccc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ Tantivy 0.16.1 ======================== - Major Bugfix on multivalued fastfield. #1151 +- Demux operation (@PSeitz) Tantivy 0.16.0 ========================= diff --git a/common/src/bitset.rs b/common/src/bitset.rs index 6aa3b357ac..a6795d13a2 100644 --- a/common/src/bitset.rs +++ b/common/src/bitset.rs @@ -349,7 +349,7 @@ impl ReadSerializedBitSet { /// Iterate the tinyset on the fly from serialized data. /// #[inline] - fn iter_tinysets<'a>(&'a self) -> impl Iterator + 'a { + fn iter_tinysets(&self) -> impl Iterator + '_ { assert!((self.data.len()) % 8 == 0); self.data.chunks_exact(8).map(move |chunk| { let tinyset: TinySet = TinySet::deserialize(chunk.try_into().unwrap()).unwrap(); diff --git a/src/core/index.rs b/src/core/index.rs index 65f00fa18f..e1ad16b1f3 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -439,7 +439,7 @@ impl Index { /// Creates a multithreaded writer /// /// Tantivy will automatically define the number of threads to use, but - /// no more than [`MAX_NUM_THREAD`] threads. + /// no more than 8 threads. /// `overall_heap_size_in_bytes` is the total target memory usage that will be split /// between a given number of threads. /// diff --git a/src/core/searcher.rs b/src/core/searcher.rs index b2a1dd8f62..f4ca148b9f 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -88,7 +88,7 @@ impl Searcher { &self.segment_readers } - /// Returns the segment_reader associated with the given segment_ordinal + /// Returns the segment_reader associated with the given segment_ord pub fn segment_reader(&self, segment_ord: u32) -> &SegmentReader { &self.segment_readers[segment_ord as usize] } diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index ce0dd5fc66..fdaf9e0c83 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -184,6 +184,7 @@ impl SegmentReader { let mut alive_bitset = AliveBitSet::open(delete_data.read_bytes()?); if let Some(provided_bitset) = custom_bitset { + assert_eq!(max_doc, provided_bitset.bitset().max_value()); alive_bitset = union_alive_bitset(&alive_bitset, &provided_bitset)?; num_docs = alive_bitset.num_alive_docs() as u32; } diff --git a/src/fastfield/alive_bitset.rs b/src/fastfield/alive_bitset.rs index 8263ce35e2..b7354df47a 100644 --- a/src/fastfield/alive_bitset.rs +++ b/src/fastfield/alive_bitset.rs @@ -55,8 +55,7 @@ impl AliveBitSet { pub(crate) fn from_bitset(bitset: &BitSet) -> AliveBitSet { let mut out = vec![]; - write_alive_bitset(&bitset, &mut out).unwrap(); - + write_alive_bitset(bitset, &mut out).unwrap(); AliveBitSet::open(OwnedBytes::new(out)) } @@ -84,7 +83,7 @@ impl AliveBitSet { !self.is_alive(doc) } - /// Iterate over the alive docids. + /// Iterate over the alive doc_ids. #[inline] pub fn iter_alive(&self) -> impl Iterator + '_ { self.bitset.iter() diff --git a/src/indexer/demuxer.rs b/src/indexer/demuxer.rs index d4337b1244..77a2f1e3a8 100644 --- a/src/indexer/demuxer.rs +++ b/src/indexer/demuxer.rs @@ -18,61 +18,52 @@ use crate::{merge_filtered_segments, Directory, Index, IndexSettings, Segment, S /// Seg 2 [TENANT_B, TENANT_B] /// /// Demuxing is the tool for that. -/// Semantically you can define a mapping from [old segment ordinal, old docid] -> [new segment ordinal]. +/// Semantically you can define a mapping from [old segment ordinal, old doc_id] -> [new segment ordinal]. #[derive(Debug, Default)] pub struct DemuxMapping { - /// [index old segment ordinal] -> [index docid] = new segment ordinal - mapping: Vec, + /// [index old segment ordinal] -> [index doc_id] = new segment ordinal + mapping: Vec, } -/// DocidToSegmentOrdinal maps from docid within a segment to the new segment ordinal for demuxing. +/// DocIdToSegmentOrdinal maps from doc_id within a segment to the new segment ordinal for demuxing. /// -/// For every source segment there is a `DocidToSegmentOrdinal` to distribute its docids. +/// For every source segment there is a `DocIdToSegmentOrdinal` to distribute its doc_ids. #[derive(Debug, Default)] -pub struct DocidToSegmentOrdinal { - docid_index_to_segment_ordinal: Vec, +pub struct DocIdToSegmentOrdinal { + doc_id_index_to_segment_ord: Vec, } -impl DocidToSegmentOrdinal { - /// Creates a new DocidToSegmentOrdinal with size of num_docids. - /// Initially all docids point to segment ordinal 0 and need to be set +impl DocIdToSegmentOrdinal { + /// Creates a new DocIdToSegmentOrdinal with size of num_doc_ids. + /// Initially all doc_ids point to segment ordinal 0 and need to be set /// the via `set` method. - pub fn with_num_docs(num_docids: usize) -> Self { - let mut vec = vec![]; - vec.resize(num_docids, 0); - - DocidToSegmentOrdinal { - docid_index_to_segment_ordinal: vec, + pub fn with_max_doc(max_doc: usize) -> Self { + DocIdToSegmentOrdinal { + doc_id_index_to_segment_ord: vec![0; max_doc], } } - /// Associated a docids with a the new SegmentOrdinal. - pub fn set(&mut self, docid: u32, segment_ordinal: SegmentOrdinal) { - self.docid_index_to_segment_ordinal[docid as usize] = segment_ordinal; + /// Returns the number of documents in this mapping. + /// It should be equal to the `max_doc` of the segment it targets. + pub fn max_doc(&self) -> u32 { + self.doc_id_index_to_segment_ord.len() as u32 + } + + /// Associates a doc_id with an output `SegmentOrdinal`. + pub fn set(&mut self, doc_id: u32, segment_ord: SegmentOrdinal) { + self.doc_id_index_to_segment_ord[doc_id as usize] = segment_ord; } - /// Iterates over the new SegmentOrdinal in the order of the docid. + /// Iterates over the new SegmentOrdinal in the order of the doc_id. pub fn iter(&self) -> impl Iterator + '_ { - self.docid_index_to_segment_ordinal.iter().cloned() + self.doc_id_index_to_segment_ord.iter().cloned() } } impl DemuxMapping { - /// Creates a new empty DemuxMapping. - pub fn empty() -> Self { - DemuxMapping { - mapping: Default::default(), - } - } - - /// Creates a DemuxMapping from existing mapping data. - pub fn new(mapping: Vec) -> Self { - DemuxMapping { mapping } - } - - /// Adds a DocidToSegmentOrdinal. The order of the pus calls + /// Adds a DocIdToSegmentOrdinal. The order of the pus calls /// defines the old segment ordinal. e.g. first push = ordinal 0. - pub fn add(&mut self, segment_mapping: DocidToSegmentOrdinal) { + pub fn add(&mut self, segment_mapping: DocIdToSegmentOrdinal) { self.mapping.push(segment_mapping); } @@ -82,32 +73,33 @@ impl DemuxMapping { } } -fn get_alive_bitsets( - demux_mapping: &DemuxMapping, - target_segment_ordinal: SegmentOrdinal, - max_value_per_segment: &[u32], -) -> Vec { - let mut bitsets: Vec<_> = max_value_per_segment +fn docs_for_segment_ord( + doc_id_to_segment_ord: &DocIdToSegmentOrdinal, + target_segment_ord: SegmentOrdinal, +) -> AliveBitSet { + let mut bitset = BitSet::with_max_value(doc_id_to_segment_ord.max_doc()); + for doc_id in doc_id_to_segment_ord .iter() - .map(|max_value| BitSet::with_max_value(*max_value)) - .collect(); - - for (old_segment_ordinal, docid_to_new_segment) in demux_mapping.mapping.iter().enumerate() { - let bitset_for_segment = &mut bitsets[old_segment_ordinal]; - for docid in docid_to_new_segment - .iter() - .enumerate() - .filter(|(_docid, new_segment_ordinal)| *new_segment_ordinal == target_segment_ordinal) - .map(|(docid, _)| docid) - { - // add document if segment ordinal = target segment ordinal - bitset_for_segment.insert(docid as u32); - } + .enumerate() + .filter(|(_doc_id, new_segment_ord)| *new_segment_ord == target_segment_ord) + .map(|(doc_id, _)| doc_id) + { + // add document if segment ordinal = target segment ordinal + bitset.insert(doc_id as u32); } + AliveBitSet::from_bitset(&bitset) +} - bitsets +fn get_alive_bitsets( + demux_mapping: &DemuxMapping, + target_segment_ord: SegmentOrdinal, +) -> Vec { + demux_mapping + .mapping .iter() - .map(|bitset| AliveBitSet::from_bitset(bitset)) + .map(|doc_id_to_segment_ord| { + docs_for_segment_ord(doc_id_to_segment_ord, target_segment_ord) + }) .collect_vec() } @@ -119,24 +111,14 @@ pub fn demux( segments: &[Segment], demux_mapping: &DemuxMapping, target_settings: IndexSettings, - mut output_directories: Vec, + output_directories: Vec, ) -> crate::Result> { - let max_value_per_segment = segments - .iter() - .map(|seg| seg.meta().max_doc()) - .collect_vec(); - let mut indices = vec![]; - for (target_segment_ordinal, output_directory) in output_directories.into_iter().enumerate() { - let delete_bitsets = get_alive_bitsets( - demux_mapping, - target_segment_ordinal as u32, - &max_value_per_segment, - ) - .into_iter() - .map(|bitset| Some(bitset)) - .collect_vec(); - + for (target_segment_ord, output_directory) in output_directories.into_iter().enumerate() { + let delete_bitsets = get_alive_bitsets(demux_mapping, target_segment_ord as u32) + .into_iter() + .map(|bitset| Some(bitset)) + .collect_vec(); let index = merge_filtered_segments( segments, target_settings.clone(), @@ -161,60 +143,60 @@ mod tests { use super::*; #[test] - fn demux_map_to_deletebitset_test() { + fn test_demux_map_to_deletebitset() { let max_value = 2; let mut demux_mapping = DemuxMapping::default(); //segment ordinal 0 mapping - let mut docid_to_segment = DocidToSegmentOrdinal::with_num_docs(max_value); - docid_to_segment.set(0, 1); - docid_to_segment.set(1, 0); - demux_mapping.add(docid_to_segment); + let mut doc_id_to_segment = DocIdToSegmentOrdinal::with_max_doc(max_value); + doc_id_to_segment.set(0, 1); + doc_id_to_segment.set(1, 0); + demux_mapping.add(doc_id_to_segment); //segment ordinal 1 mapping - let mut docid_to_segment = DocidToSegmentOrdinal::with_num_docs(max_value); - docid_to_segment.set(0, 1); - docid_to_segment.set(1, 1); - demux_mapping.add(docid_to_segment); + let mut doc_id_to_segment = DocIdToSegmentOrdinal::with_max_doc(max_value); + doc_id_to_segment.set(0, 1); + doc_id_to_segment.set(1, 1); + demux_mapping.add(doc_id_to_segment); { - let bit_sets_for_demuxing_to_segment_ordinal_0 = + let bit_sets_for_demuxing_to_segment_ord_0 = get_alive_bitsets(&demux_mapping, 0, &[max_value as u32, max_value as u32]); assert_eq!( - bit_sets_for_demuxing_to_segment_ordinal_0[0].is_deleted(0), + bit_sets_for_demuxing_to_segment_ord_0[0].is_deleted(0), true ); assert_eq!( - bit_sets_for_demuxing_to_segment_ordinal_0[0].is_deleted(1), + bit_sets_for_demuxing_to_segment_ord_0[0].is_deleted(1), false ); assert_eq!( - bit_sets_for_demuxing_to_segment_ordinal_0[1].is_deleted(0), + bit_sets_for_demuxing_to_segment_ord_0[1].is_deleted(0), true ); assert_eq!( - bit_sets_for_demuxing_to_segment_ordinal_0[1].is_deleted(1), + bit_sets_for_demuxing_to_segment_ord_0[1].is_deleted(1), true ); } { - let bit_sets_for_demuxing_to_segment_ordinal_1 = + let bit_sets_for_demuxing_to_segment_ord_1 = get_alive_bitsets(&demux_mapping, 1, &[max_value as u32, max_value as u32]); assert_eq!( - bit_sets_for_demuxing_to_segment_ordinal_1[0].is_deleted(0), + bit_sets_for_demuxing_to_segment_ord_1[0].is_deleted(0), false ); assert_eq!( - bit_sets_for_demuxing_to_segment_ordinal_1[0].is_deleted(1), + bit_sets_for_demuxing_to_segment_ord_1[0].is_deleted(1), true ); assert_eq!( - bit_sets_for_demuxing_to_segment_ordinal_1[1].is_deleted(0), + bit_sets_for_demuxing_to_segment_ord_1[1].is_deleted(0), false ); assert_eq!( - bit_sets_for_demuxing_to_segment_ordinal_1[1].is_deleted(1), + bit_sets_for_demuxing_to_segment_ord_1[1].is_deleted(1), false ); } @@ -256,16 +238,16 @@ mod tests { { let max_value = 2; //segment ordinal 0 mapping - let mut docid_to_segment = DocidToSegmentOrdinal::with_num_docs(max_value); - docid_to_segment.set(0, 1); - docid_to_segment.set(1, 0); - demux_mapping.add(docid_to_segment); + let mut doc_id_to_segment = DocIdToSegmentOrdinal::with_max_doc(max_value); + doc_id_to_segment.set(0, 1); + doc_id_to_segment.set(1, 0); + demux_mapping.add(doc_id_to_segment); //segment ordinal 1 mapping - let mut docid_to_segment = DocidToSegmentOrdinal::with_num_docs(max_value); - docid_to_segment.set(0, 1); - docid_to_segment.set(1, 1); - demux_mapping.add(docid_to_segment); + let mut doc_id_to_segment = DocIdToSegmentOrdinal::with_max_doc(max_value); + doc_id_to_segment.set(0, 1); + doc_id_to_segment.set(1, 1); + demux_mapping.add(doc_id_to_segment); } assert_eq!(demux_mapping.get_old_num_segments(), 2); diff --git a/src/indexer/doc_id_mapping.rs b/src/indexer/doc_id_mapping.rs index 350b451396..8ebb4772b9 100644 --- a/src/indexer/doc_id_mapping.rs +++ b/src/indexer/doc_id_mapping.rs @@ -11,12 +11,12 @@ use std::{cmp::Reverse, ops::Index}; /// Struct to provide mapping from new doc_id to old doc_id and segment. #[derive(Clone)] -pub(crate) struct SegmentDocidMapping { +pub(crate) struct SegmentDocIdMapping { new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>, is_trivial: bool, } -impl SegmentDocidMapping { +impl SegmentDocIdMapping { pub(crate) fn new( new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>, is_trivial: bool, @@ -40,14 +40,14 @@ impl SegmentDocidMapping { self.is_trivial } } -impl Index for SegmentDocidMapping { +impl Index for SegmentDocIdMapping { type Output = (DocId, SegmentOrdinal); fn index(&self, idx: usize) -> &Self::Output { &self.new_doc_id_to_old_and_segment[idx] } } -impl IntoIterator for SegmentDocidMapping { +impl IntoIterator for SegmentDocIdMapping { type Item = (DocId, SegmentOrdinal); type IntoIter = std::vec::IntoIter; diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 4a5dade6df..eb7be4c167 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -10,7 +10,7 @@ use crate::fastfield::MultiValuedFastFieldReader; use crate::fieldnorm::FieldNormsSerializer; use crate::fieldnorm::FieldNormsWriter; use crate::fieldnorm::{FieldNormReader, FieldNormReaders}; -use crate::indexer::doc_id_mapping::SegmentDocidMapping; +use crate::indexer::doc_id_mapping::SegmentDocIdMapping; use crate::indexer::SegmentSerializer; use crate::postings::Postings; use crate::postings::{InvertedIndexSerializer, SegmentPostings}; @@ -236,7 +236,7 @@ impl IndexMerger { fn write_fieldnorms( &self, mut fieldnorms_serializer: FieldNormsSerializer, - doc_id_mapping: &SegmentDocidMapping, + doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { let fields = FieldNormsWriter::fields_with_fieldnorm(&self.schema); let mut fieldnorms_data = Vec::with_capacity(self.max_doc as usize); @@ -264,7 +264,7 @@ impl IndexMerger { &self, fast_field_serializer: &mut CompositeFastFieldSerializer, mut term_ord_mappings: HashMap, - doc_id_mapping: &SegmentDocidMapping, + doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { debug_time!("write_fast_fields"); @@ -315,7 +315,7 @@ impl IndexMerger { &self, field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &SegmentDocidMapping, + doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { let (min_value, max_value) = self.readers.iter().map(|reader|{ let u64_reader: DynamicFastFieldReader = reader @@ -347,17 +347,17 @@ impl IndexMerger { num_vals: doc_id_mapping.len() as u64, }; #[derive(Clone)] - struct SortedDocidFieldAccessProvider<'a> { - doc_id_mapping: &'a SegmentDocidMapping, + struct SortedDocIdFieldAccessProvider<'a> { + doc_id_mapping: &'a SegmentDocIdMapping, fast_field_readers: &'a Vec>, } - impl<'a> FastFieldDataAccess for SortedDocidFieldAccessProvider<'a> { + impl<'a> FastFieldDataAccess for SortedDocIdFieldAccessProvider<'a> { fn get_val(&self, doc: u64) -> u64 { let (doc_id, reader_ordinal) = self.doc_id_mapping[doc as usize]; self.fast_field_readers[reader_ordinal as usize].get(doc_id) } } - let fastfield_accessor = SortedDocidFieldAccessProvider { + let fastfield_accessor = SortedDocIdFieldAccessProvider { doc_id_mapping, fast_field_readers: &fast_field_readers, }; @@ -439,7 +439,7 @@ impl IndexMerger { pub(crate) fn generate_doc_id_mapping( &self, sort_by_field: &IndexSortByField, - ) -> crate::Result { + ) -> crate::Result { let reader_ordinal_and_field_accessors = self.get_reader_with_sort_field_accessor(sort_by_field)?; // Loading the field accessor on demand causes a 15x regression @@ -482,7 +482,7 @@ impl IndexMerger { }) .map(|(doc_id, reader_with_id, _)| (doc_id, reader_with_id)), ); - Ok(SegmentDocidMapping::new(sorted_doc_ids, false)) + Ok(SegmentDocIdMapping::new(sorted_doc_ids, false)) } // Creating the index file to point into the data, generic over `BytesFastFieldReader` and @@ -491,7 +491,7 @@ impl IndexMerger { fn write_1_n_fast_field_idx_generic( field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &SegmentDocidMapping, + doc_id_mapping: &SegmentDocIdMapping, reader_and_field_accessors: &[(&SegmentReader, T)], ) -> crate::Result> { let mut total_num_vals = 0u64; @@ -550,7 +550,7 @@ impl IndexMerger { &self, field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &SegmentDocidMapping, + doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result> { let reader_ordinal_and_field_accessors = self.readers.iter().map(|reader|{ let u64s_reader: MultiValuedFastFieldReader = reader.fast_fields() @@ -572,7 +572,7 @@ impl IndexMerger { field: Field, term_ordinal_mappings: &TermOrdinalMapping, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &SegmentDocidMapping, + doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { debug_time!("write_hierarchical_facet_field"); @@ -621,7 +621,7 @@ impl IndexMerger { /// Creates a mapping if the segments are stacked. this is helpful to merge codelines between index /// sorting and the others - pub(crate) fn get_doc_id_from_concatenated_data(&self) -> crate::Result { + pub(crate) fn get_doc_id_from_concatenated_data(&self) -> crate::Result { let total_num_new_docs = self .readers .iter() @@ -641,13 +641,13 @@ impl IndexMerger { }) .flatten(), ); - Ok(SegmentDocidMapping::new(mapping, true)) + Ok(SegmentDocIdMapping::new(mapping, true)) } fn write_multi_fast_field( &self, field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &SegmentDocidMapping, + doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { // Multifastfield consists in 2 fastfields. // The first serves as an index into the second one and is stricly increasing. @@ -703,16 +703,16 @@ impl IndexMerger { min_value, }; - struct SortedDocidMultiValueAccessProvider<'a> { - doc_id_mapping: &'a SegmentDocidMapping, + struct SortedDocIdMultiValueAccessProvider<'a> { + doc_id_mapping: &'a SegmentDocIdMapping, fast_field_readers: &'a Vec>, offsets: Vec, } - impl<'a> FastFieldDataAccess for SortedDocidMultiValueAccessProvider<'a> { + impl<'a> FastFieldDataAccess for SortedDocIdMultiValueAccessProvider<'a> { fn get_val(&self, pos: u64) -> u64 { // use the offsets index to find the doc_id which will contain the position. // the offsets are stricly increasing so we can do a simple search on it. - let new_docid = self + let new_doc_id = self .offsets .iter() .position(|&offset| offset > pos) @@ -720,10 +720,10 @@ impl IndexMerger { - 1; // now we need to find the position of `pos` in the multivalued bucket - let num_pos_covered_until_now = self.offsets[new_docid]; + let num_pos_covered_until_now = self.offsets[new_doc_id]; let pos_in_values = pos - num_pos_covered_until_now; - let (old_doc_id, reader_ordinal) = self.doc_id_mapping[new_docid as usize]; + let (old_doc_id, reader_ordinal) = self.doc_id_mapping[new_doc_id as usize]; let num_vals = self.fast_field_readers[reader_ordinal as usize].get_len(old_doc_id); assert!(num_vals >= pos_in_values); let mut vals = vec![]; @@ -732,7 +732,7 @@ impl IndexMerger { vals[pos_in_values as usize] } } - let fastfield_accessor = SortedDocidMultiValueAccessProvider { + let fastfield_accessor = SortedDocIdMultiValueAccessProvider { doc_id_mapping, fast_field_readers: &ff_readers, offsets, @@ -771,7 +771,7 @@ impl IndexMerger { &self, field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &SegmentDocidMapping, + doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { let reader_and_field_accessors = self .readers @@ -807,7 +807,7 @@ impl IndexMerger { field_type: &FieldType, serializer: &mut InvertedIndexSerializer, fieldnorm_reader: Option, - doc_id_mapping: &SegmentDocidMapping, + doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result> { debug_time!("write_postings_for_field"); let mut positions_buffer: Vec = Vec::with_capacity(1_000); @@ -846,8 +846,8 @@ impl IndexMerger { segment_local_map }) .collect(); - for (new_doc_id, (old_doc_id, segment_ordinal)) in doc_id_mapping.iter().enumerate() { - let segment_map = &mut merged_doc_id_map[*segment_ordinal as usize]; + for (new_doc_id, (old_doc_id, segment_ord)) in doc_id_mapping.iter().enumerate() { + let segment_map = &mut merged_doc_id_map[*segment_ord as usize]; segment_map[*old_doc_id as usize] = Some(new_doc_id as DocId); } @@ -889,7 +889,7 @@ impl IndexMerger { let mut total_doc_freq = 0; // Let's compute the list of non-empty posting lists - for (segment_ord, term_info) in merged_terms.current_segment_ordinals_and_term_infos() { + for (segment_ord, term_info) in merged_terms.current_segment_ords_and_term_infos() { let segment_reader = &self.readers[segment_ord]; let inverted_index: &InvertedIndexReader = &*field_readers[segment_ord]; let segment_postings = inverted_index @@ -939,9 +939,9 @@ impl IndexMerger { // there is at least one document. let term_freq = segment_postings.term_freq(); segment_postings.positions(&mut positions_buffer); - // if doc_id_mapping exists, the docids are reordered, they are + // if doc_id_mapping exists, the doc_ids are reordered, they are // not just stacked. The field serializer expects monotonically increasing - // docids, so we collect and sort them first, before writing. + // doc_ids, so we collect and sort them first, before writing. // // I think this is not strictly necessary, it would be possible to // avoid the loading into a vec via some form of kmerge, but then the merge @@ -981,7 +981,7 @@ impl IndexMerger { &self, serializer: &mut InvertedIndexSerializer, fieldnorm_readers: FieldNormReaders, - doc_id_mapping: &SegmentDocidMapping, + doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result> { let mut term_ordinal_mappings = HashMap::new(); for (field, field_entry) in self.schema.fields() { @@ -1004,7 +1004,7 @@ impl IndexMerger { fn write_storable_fields( &self, store_writer: &mut StoreWriter, - doc_id_mapping: &SegmentDocidMapping, + doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { debug_time!("write_storable_fields"); @@ -1596,7 +1596,7 @@ mod tests { #[test] fn test_merge_facets_sort_asc() { - // In the merge case this will go through the docid mapping code + // In the merge case this will go through the doc_id mapping code test_merge_facets( Some(IndexSettings { sort_by_field: Some(IndexSortByField { @@ -1607,7 +1607,7 @@ mod tests { }), true, ); - // In the merge case this will not go through the docid mapping code, because the data is + // In the merge case this will not go through the doc_id mapping code, because the data is // sorted and disjunct test_merge_facets( Some(IndexSettings { @@ -1623,7 +1623,7 @@ mod tests { #[test] fn test_merge_facets_sort_desc() { - // In the merge case this will go through the docid mapping code + // In the merge case this will go through the doc_id mapping code test_merge_facets( Some(IndexSettings { sort_by_field: Some(IndexSortByField { @@ -1634,7 +1634,7 @@ mod tests { }), true, ); - // In the merge case this will not go through the docid mapping code, because the data is + // In the merge case this will not go through the doc_id mapping code, because the data is // sorted and disjunct test_merge_facets( Some(IndexSettings { diff --git a/src/indexer/merger_sorted_index_test.rs b/src/indexer/merger_sorted_index_test.rs index 7443d1e380..867de1250e 100644 --- a/src/indexer/merger_sorted_index_test.rs +++ b/src/indexer/merger_sorted_index_test.rs @@ -554,7 +554,7 @@ mod bench_sorted_index_merge { .expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen."); (doc_id, reader, u64_reader) }); - // add values in order of the new docids + // add values in order of the new doc_ids let mut val = 0; for (doc_id, _reader, field_reader) in sorted_doc_ids { val = field_reader.get(*doc_id); @@ -567,7 +567,7 @@ mod bench_sorted_index_merge { Ok(()) } #[bench] - fn create_sorted_index_create_docid_mapping(b: &mut Bencher) -> crate::Result<()> { + fn create_sorted_index_create_doc_id_mapping(b: &mut Bencher) -> crate::Result<()> { let sort_by_field = IndexSortByField { field: "intval".to_string(), order: Order::Desc, diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index b7d8328f55..11db76de40 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -194,7 +194,7 @@ pub fn merge_indices( } /// Advanced: Merges a list of segments from different indices in a new index. -/// Additional you can provide a delete bitset for each segment to ignore docids. +/// Additional you can provide a delete bitset for each segment to ignore doc_ids. /// /// Returns `TantivyError` if the the indices list is empty or their /// schemas don't match. @@ -209,7 +209,7 @@ pub fn merge_indices( pub fn merge_filtered_segments( segments: &[Segment], target_settings: IndexSettings, - filter_docids: Vec>, + filter_doc_ids: Vec>, output_directory: Dir, ) -> crate::Result { if segments.is_empty() { @@ -243,7 +243,7 @@ pub fn merge_filtered_segments( merged_index.schema(), merged_index.settings().clone(), &segments[..], - filter_docids, + filter_doc_ids, )?; let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?; let num_docs = merger.write(segment_serializer)?; @@ -1051,7 +1051,7 @@ mod tests { } #[test] - fn test_apply_docid_filter_in_merger() -> crate::Result<()> { + fn test_apply_doc_id_filter_in_merger() -> crate::Result<()> { let first_index = { let mut schema_builder = Schema::builder(); let text_field = schema_builder.add_text_field("text", TEXT); @@ -1089,8 +1089,8 @@ mod tests { filter_segments, )?; - let docids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect(); - assert_eq!(docids_alive, vec![0, 2]); + let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect(); + assert_eq!(doc_ids_alive, vec![0, 2]); } { @@ -1108,8 +1108,8 @@ mod tests { filter_segments, )?; - let docids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect(); - assert_eq!(docids_alive, vec![0, 1, 2]); + let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect(); + assert_eq!(doc_ids_alive, vec![0, 1, 2]); } Ok(()) diff --git a/src/store/mod.rs b/src/store/mod.rs index c6a1dda7d0..8452cbef45 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -112,9 +112,9 @@ pub mod tests { #[test] fn test_doc_store_iter_with_delete_bug_1077() -> crate::Result<()> { // this will cover deletion of the first element in a checkpoint - let deleted_docids = (200..300).collect::>(); + let deleted_doc_ids = (200..300).collect::>(); let alive_bitset = - AliveBitSet::for_test_from_deleted_docs(&deleted_docids, NUM_DOCS as u32); + AliveBitSet::for_test_from_deleted_docs(&deleted_doc_ids, NUM_DOCS as u32); let path = Path::new("store"); let directory = RamDirectory::create(); diff --git a/src/store/reader.rs b/src/store/reader.rs index 98c127d978..c27c88d152 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -151,7 +151,7 @@ impl StoreReader { &'b self, alive_bitset: Option<&'a AliveBitSet>, ) -> impl Iterator> + 'b { - let last_docid = self + let last_doc_id = self .block_checkpoints() .last() .map(|checkpoint| checkpoint.doc_range.end) @@ -164,7 +164,7 @@ impl StoreReader { let mut block_start_pos = 0; let mut num_skipped = 0; let mut reset_block_pos = false; - (0..last_docid) + (0..last_doc_id) .filter_map(move |doc_id| { // filter_map is only used to resolve lifetime issues between the two closures on // the outer variables diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index 2c3d2e18e0..18d7de61d3 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -77,7 +77,7 @@ impl<'a> TermMerger<'a> { /// This method may be called /// iff advance() has been called before /// and "true" was returned. - pub fn current_segment_ordinals_and_term_infos<'b: 'a>( + pub fn current_segment_ords_and_term_infos<'b: 'a>( &'b self, ) -> impl 'b + Iterator { self.current_segment_and_term_ordinals