From 8ea4653c9b980afb98a0a9288b51beb6e3da6438 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 17 May 2021 16:35:49 +0200 Subject: [PATCH 1/6] fix and refactor log merge policy, fixes #1035 fixes a bug in log merge policy where an index was wrongly referenced by its index --- src/indexer/log_merge_policy.rs | 104 +++++++++++++++++++++++++------- 1 file changed, 81 insertions(+), 23 deletions(-) diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index b0a2ba100f..077e6946e0 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -54,40 +54,35 @@ impl LogMergePolicy { impl MergePolicy for LogMergePolicy { fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec { - let mut size_sorted_tuples = segments + let mut size_sorted_segments = segments .iter() - .map(SegmentMeta::num_docs) - .enumerate() - .filter(|(_, s)| s <= &(self.max_merge_size as u32)) - .collect::>(); + .filter(|s| s.num_docs() <= (self.max_merge_size as u32)) + .collect::>(); - size_sorted_tuples.sort_by(|x, y| y.1.cmp(&(x.1))); - - if size_sorted_tuples.len() <= 1 { - return Vec::new(); + if size_sorted_segments.len() <= 1 { + return vec![]; } + size_sorted_segments.sort_by_key(|seg| seg.num_docs()); - let size_sorted_log_tuples: Vec<_> = size_sorted_tuples + let sorted_segments_with_log_size: Vec<_> = size_sorted_segments .into_iter() - .map(|(ind, num_docs)| (ind, f64::from(self.clip_min_size(num_docs)).log2())) + .map(|seg| (seg, f64::from(self.clip_min_size(seg.num_docs())).log2())) .collect(); - if let Some(&(first_ind, first_score)) = size_sorted_log_tuples.first() { - let mut current_max_log_size = first_score; - let mut levels = vec![vec![first_ind]]; - for &(ind, score) in (&size_sorted_log_tuples).iter().skip(1) { - if score < (current_max_log_size - self.level_log_size) { - current_max_log_size = score; + if let Some(&(first_segment, log_size)) = sorted_segments_with_log_size.first() { + let mut current_max_log_size = log_size; + let mut levels = vec![vec![first_segment]]; + for &(segment, segment_log_size) in (&sorted_segments_with_log_size).iter().skip(1) { + if segment_log_size < (current_max_log_size - self.level_log_size) { + current_max_log_size = segment_log_size; levels.push(Vec::new()); } - levels.last_mut().unwrap().push(ind); + levels.last_mut().unwrap().push(segment); } levels .iter() .filter(|level| level.len() >= self.min_merge_size) - .map(|ind_vec| { - MergeCandidate(ind_vec.iter().map(|&ind| segments[ind].id()).collect()) - }) + .map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect())) .collect() } else { return vec![]; @@ -109,12 +104,75 @@ impl Default for LogMergePolicy { #[cfg(test)] mod tests { use super::*; - use crate::core::{SegmentId, SegmentMeta, SegmentMetaInventory}; - use crate::indexer::merge_policy::MergePolicy; + use crate::{ + core::{SegmentId, SegmentMeta, SegmentMetaInventory}, + schema, + }; + use crate::{indexer::merge_policy::MergePolicy, schema::INDEXED}; use once_cell::sync::Lazy; static INVENTORY: Lazy = Lazy::new(SegmentMetaInventory::default); + use crate::Index; + + #[test] + fn create_index_test_max_merge_issue_1035() { + let mut schema_builder = schema::Schema::builder(); + let int_field = schema_builder.add_u64_field("intval", INDEXED); + let schema = schema_builder.build(); + + let index = Index::create_in_ram(schema); + + { + let mut log_merge_policy = LogMergePolicy::default(); + log_merge_policy.set_min_merge_size(1); + log_merge_policy.set_max_merge_size(1); + log_merge_policy.set_min_layer_size(0); + + let mut index_writer = index.writer_for_tests().unwrap(); + index_writer.set_merge_policy(Box::new(log_merge_policy)); + + // after every commit the merge checker is started, it will merge only segments with 1 + // element in it because of the max_merge_size. + index_writer.add_document(doc!(int_field=>1_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>2_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>3_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>4_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>5_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>6_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>7_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>8_u64)); + assert!(index_writer.commit().is_ok()); + } + + let _segment_ids = index + .searchable_segment_ids() + .expect("Searchable segments failed."); + + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + let segment_readers = searcher.segment_readers(); + for segment in segment_readers { + if segment.num_docs() > 2 { + panic!("segment can't have more than two segments"); + } // don't know how to wait for the merge, then it could be a simple eq + } + } + fn test_merge_policy() -> LogMergePolicy { let mut log_merge_policy = LogMergePolicy::default(); log_merge_policy.set_min_merge_size(3); From e5e820feec2193dd9786d3fdd96cb63ed9e2bf4c Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 18 May 2021 06:55:10 +0200 Subject: [PATCH 2/6] cleanup --- src/indexer/log_merge_policy.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 077e6946e0..27ef32e037 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -56,7 +56,7 @@ impl MergePolicy for LogMergePolicy { fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec { let mut size_sorted_segments = segments .iter() - .filter(|s| s.num_docs() <= (self.max_merge_size as u32)) + .filter(|segment_meta| segment_meta.num_docs() <= (self.max_merge_size as u32)) .collect::>(); if size_sorted_segments.len() <= 1 { @@ -72,7 +72,7 @@ impl MergePolicy for LogMergePolicy { if let Some(&(first_segment, log_size)) = sorted_segments_with_log_size.first() { let mut current_max_log_size = log_size; let mut levels = vec![vec![first_segment]]; - for &(segment, segment_log_size) in (&sorted_segments_with_log_size).iter().skip(1) { + for &(segment, segment_log_size) in sorted_segments_with_log_size.iter().skip(1) { if segment_log_size < (current_max_log_size - self.level_log_size) { current_max_log_size = segment_log_size; levels.push(Vec::new()); From 6473a8c11d3962e6532a1460819c274b1842b5aa Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 18 May 2021 09:33:43 +0200 Subject: [PATCH 3/6] fix sort order, improve method names --- src/directory/mmap_directory.rs | 2 +- src/indexer/log_merge_policy.rs | 41 +++++++++++++++++---------------- src/indexer/merger.rs | 2 +- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index d929c524a9..be06032713 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -593,7 +593,7 @@ mod tests { let mut index_writer = index.writer_for_tests().unwrap(); let mut log_merge_policy = LogMergePolicy::default(); - log_merge_policy.set_min_merge_size(3); + log_merge_policy.set_min_num_segments(3); index_writer.set_merge_policy(Box::new(log_merge_policy)); for _num_commits in 0..10 { for _ in 0..10 { diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 27ef32e037..12b7b3edf4 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -12,8 +12,8 @@ const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000; /// documents. #[derive(Debug, Clone)] pub struct LogMergePolicy { - min_merge_size: usize, - max_merge_size: usize, + min_num_segments: usize, + max_docs_before_merge: usize, min_layer_size: u32, level_log_size: f64, } @@ -23,15 +23,16 @@ impl LogMergePolicy { cmp::max(self.min_layer_size, size) } - /// Set the minimum number of segment that may be merge together. - pub fn set_min_merge_size(&mut self, min_merge_size: usize) { - self.min_merge_size = min_merge_size; + /// Set the minimum number of segments that may be merged together in a layer. + pub fn set_min_num_segments(&mut self, min_num_segments: usize) { + self.min_num_segments = min_num_segments; } /// Set the maximum number docs in a segment for it to be considered for - /// merging. - pub fn set_max_merge_size(&mut self, max_merge_size: usize) { - self.max_merge_size = max_merge_size; + /// merging. A segment can still reach more than max_docs, by merging many + /// smaller ones. + pub fn set_max_docs_before_merge(&mut self, max_docs_merge_size: usize) { + self.max_docs_before_merge = max_docs_merge_size; } /// Set the minimum segment size under which all segment belong @@ -42,7 +43,7 @@ impl LogMergePolicy { /// Set the ratio between two consecutive levels. /// - /// Segment are group in levels according to their sizes. + /// Segments are grouped in levels according to their sizes. /// These levels are defined as intervals of exponentially growing sizes. /// level_log_size define the factor by which one should multiply the limit /// to reach a level, in order to get the limit to reach the following @@ -56,22 +57,22 @@ impl MergePolicy for LogMergePolicy { fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec { let mut size_sorted_segments = segments .iter() - .filter(|segment_meta| segment_meta.num_docs() <= (self.max_merge_size as u32)) + .filter(|segment_meta| segment_meta.num_docs() <= (self.max_docs_before_merge as u32)) .collect::>(); if size_sorted_segments.len() <= 1 { return vec![]; } - size_sorted_segments.sort_by_key(|seg| seg.num_docs()); + size_sorted_segments.sort_by_key(|seg| std::cmp::Reverse(seg.num_docs())); let sorted_segments_with_log_size: Vec<_> = size_sorted_segments .into_iter() .map(|seg| (seg, f64::from(self.clip_min_size(seg.num_docs())).log2())) .collect(); - if let Some(&(first_segment, log_size)) = sorted_segments_with_log_size.first() { + if let Some(&(largest_segment, log_size)) = sorted_segments_with_log_size.first() { let mut current_max_log_size = log_size; - let mut levels = vec![vec![first_segment]]; + let mut levels = vec![vec![largest_segment]]; for &(segment, segment_log_size) in sorted_segments_with_log_size.iter().skip(1) { if segment_log_size < (current_max_log_size - self.level_log_size) { current_max_log_size = segment_log_size; @@ -81,7 +82,7 @@ impl MergePolicy for LogMergePolicy { } levels .iter() - .filter(|level| level.len() >= self.min_merge_size) + .filter(|level| level.len() >= self.min_num_segments) .map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect())) .collect() } else { @@ -93,8 +94,8 @@ impl MergePolicy for LogMergePolicy { impl Default for LogMergePolicy { fn default() -> LogMergePolicy { LogMergePolicy { - min_merge_size: DEFAULT_MIN_MERGE_SIZE, - max_merge_size: DEFAULT_MAX_MERGE_SIZE, + min_num_segments: DEFAULT_MIN_MERGE_SIZE, + max_docs_before_merge: DEFAULT_MAX_MERGE_SIZE, min_layer_size: DEFAULT_MIN_LAYER_SIZE, level_log_size: DEFAULT_LEVEL_LOG_SIZE, } @@ -125,8 +126,8 @@ mod tests { { let mut log_merge_policy = LogMergePolicy::default(); - log_merge_policy.set_min_merge_size(1); - log_merge_policy.set_max_merge_size(1); + log_merge_policy.set_min_num_segments(1); + log_merge_policy.set_max_docs_before_merge(1); log_merge_policy.set_min_layer_size(0); let mut index_writer = index.writer_for_tests().unwrap(); @@ -175,8 +176,8 @@ mod tests { fn test_merge_policy() -> LogMergePolicy { let mut log_merge_policy = LogMergePolicy::default(); - log_merge_policy.set_min_merge_size(3); - log_merge_policy.set_max_merge_size(100_000); + log_merge_policy.set_min_num_segments(3); + log_merge_policy.set_max_docs_before_merge(100_000); log_merge_policy.set_min_layer_size(2); log_merge_policy } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index b65b5431a0..33c9246359 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1828,7 +1828,7 @@ mod tests { // Make sure we'll attempt to merge every created segment let mut policy = crate::indexer::LogMergePolicy::default(); - policy.set_min_merge_size(2); + policy.set_min_num_segments(2); writer.set_merge_policy(Box::new(policy)); for i in 0..100 { From 93997a8b8e66da7a1a62411eda4a845f6c0ec781 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 18 May 2021 09:55:31 +0200 Subject: [PATCH 4/6] use itertools groupby, fix serialization test --- src/indexer/index_writer.rs | 2 +- src/indexer/log_merge_policy.rs | 41 ++++++++++++++++----------------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index dee4fefbc6..718f5203f1 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -945,7 +945,7 @@ mod tests { let index_writer = index.writer(3_000_000).unwrap(); assert_eq!( format!("{:?}", index_writer.get_merge_policy()), - "LogMergePolicy { min_merge_size: 8, max_merge_size: 10000000, min_layer_size: 10000, \ + "LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, min_layer_size: 10000, \ level_log_size: 0.75 }" ); let merge_policy = Box::new(NoMergePolicy::default()); diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 12b7b3edf4..68345aee78 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -1,5 +1,6 @@ use super::merge_policy::{MergeCandidate, MergePolicy}; use crate::core::SegmentMeta; +use itertools::Itertools; use std::cmp; use std::f64; @@ -65,29 +66,27 @@ impl MergePolicy for LogMergePolicy { } size_sorted_segments.sort_by_key(|seg| std::cmp::Reverse(seg.num_docs())); - let sorted_segments_with_log_size: Vec<_> = size_sorted_segments - .into_iter() - .map(|seg| (seg, f64::from(self.clip_min_size(seg.num_docs())).log2())) - .collect(); - - if let Some(&(largest_segment, log_size)) = sorted_segments_with_log_size.first() { - let mut current_max_log_size = log_size; - let mut levels = vec![vec![largest_segment]]; - for &(segment, segment_log_size) in sorted_segments_with_log_size.iter().skip(1) { - if segment_log_size < (current_max_log_size - self.level_log_size) { - current_max_log_size = segment_log_size; - levels.push(Vec::new()); - } - levels.last_mut().unwrap().push(segment); + let mut current_max_log_size = f64::MAX; + let mut levels = vec![]; + for (_, merge_group) in &size_sorted_segments.into_iter().group_by(|segment| { + let segment_log_size = f64::from(self.clip_min_size(segment.num_docs())).log2(); + if segment_log_size < (current_max_log_size - self.level_log_size) { + // update current_max_log_size to create a new group + current_max_log_size = segment_log_size; + current_max_log_size + } else { + // return current_max_log_size to be grouped to the current group + current_max_log_size } - levels - .iter() - .filter(|level| level.len() >= self.min_num_segments) - .map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect())) - .collect() - } else { - return vec![]; + }) { + levels.push(merge_group.collect::>()); } + + levels + .iter() + .filter(|level| level.len() >= self.min_num_segments) + .map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect())) + .collect() } } From 6cbd946657d64d1ac9d10b66a7d785597e44b6ce Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 18 May 2021 10:24:38 +0200 Subject: [PATCH 5/6] minor improvments --- src/indexer/log_merge_policy.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 68345aee78..3214962a94 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -24,7 +24,7 @@ impl LogMergePolicy { cmp::max(self.min_layer_size, size) } - /// Set the minimum number of segments that may be merged together in a layer. + /// Set the minimum number of segments that may be merged together. pub fn set_min_num_segments(&mut self, min_num_segments: usize) { self.min_num_segments = min_num_segments; } @@ -73,11 +73,9 @@ impl MergePolicy for LogMergePolicy { if segment_log_size < (current_max_log_size - self.level_log_size) { // update current_max_log_size to create a new group current_max_log_size = segment_log_size; - current_max_log_size - } else { - // return current_max_log_size to be grouped to the current group - current_max_log_size } + // return current_max_log_size to be grouped to the current group + current_max_log_size }) { levels.push(merge_group.collect::>()); } From b2c6a49aa6459b8a0d95b78aff2d4ac764c8c128 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 18 May 2021 10:27:11 +0200 Subject: [PATCH 6/6] update names --- src/indexer/log_merge_policy.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 3214962a94..a7051ca324 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -6,8 +6,8 @@ use std::f64; const DEFAULT_LEVEL_LOG_SIZE: f64 = 0.75; const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000; -const DEFAULT_MIN_MERGE_SIZE: usize = 8; -const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000; +const DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE: usize = 8; +const DEFAULT_MAX_DOCS_BEFORE_MERGE: usize = 10_000_000; /// `LogMergePolicy` tries to merge segments that have a similar number of /// documents. @@ -91,8 +91,8 @@ impl MergePolicy for LogMergePolicy { impl Default for LogMergePolicy { fn default() -> LogMergePolicy { LogMergePolicy { - min_num_segments: DEFAULT_MIN_MERGE_SIZE, - max_docs_before_merge: DEFAULT_MAX_MERGE_SIZE, + min_num_segments: DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE, + max_docs_before_merge: DEFAULT_MAX_DOCS_BEFORE_MERGE, min_layer_size: DEFAULT_MIN_LAYER_SIZE, level_log_size: DEFAULT_LEVEL_LOG_SIZE, }