Skip to content

Commit

Permalink
fix and refactor log merge policy, fixes #1035 (#1043)
Browse files Browse the repository at this point in the history
* fix and refactor log merge policy, fixes #1035

fixes a bug in log merge policy where an index was wrongly referenced by its index

* cleanup

* fix sort order, improve method names

* use itertools groupby, fix serialization test

* minor improvments

* update names
  • Loading branch information
PSeitz authored May 19, 2021
1 parent 249bc6c commit bcd72e5
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 52 deletions.
2 changes: 1 addition & 1 deletion src/directory/mmap_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ mod tests {

let mut index_writer = index.writer_for_tests().unwrap();
let mut log_merge_policy = LogMergePolicy::default();
log_merge_policy.set_min_merge_size(3);
log_merge_policy.set_min_num_segments(3);
index_writer.set_merge_policy(Box::new(log_merge_policy));
for _num_commits in 0..10 {
for _ in 0..10 {
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ mod tests {
let index_writer = index.writer(3_000_000).unwrap();
assert_eq!(
format!("{:?}", index_writer.get_merge_policy()),
"LogMergePolicy { min_merge_size: 8, max_merge_size: 10000000, min_layer_size: 10000, \
"LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, min_layer_size: 10000, \
level_log_size: 0.75 }"
);
let merge_policy = Box::new(NoMergePolicy::default());
Expand Down
154 changes: 105 additions & 49 deletions src/indexer/log_merge_policy.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use super::merge_policy::{MergeCandidate, MergePolicy};
use crate::core::SegmentMeta;
use itertools::Itertools;
use std::cmp;
use std::f64;

const DEFAULT_LEVEL_LOG_SIZE: f64 = 0.75;
const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
const DEFAULT_MIN_MERGE_SIZE: usize = 8;
const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000;
const DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE: usize = 8;
const DEFAULT_MAX_DOCS_BEFORE_MERGE: usize = 10_000_000;

/// `LogMergePolicy` tries to merge segments that have a similar number of
/// documents.
#[derive(Debug, Clone)]
pub struct LogMergePolicy {
min_merge_size: usize,
max_merge_size: usize,
min_num_segments: usize,
max_docs_before_merge: usize,
min_layer_size: u32,
level_log_size: f64,
}
Expand All @@ -23,15 +24,16 @@ impl LogMergePolicy {
cmp::max(self.min_layer_size, size)
}

/// Set the minimum number of segment that may be merge together.
pub fn set_min_merge_size(&mut self, min_merge_size: usize) {
self.min_merge_size = min_merge_size;
/// Set the minimum number of segments that may be merged together.
pub fn set_min_num_segments(&mut self, min_num_segments: usize) {
self.min_num_segments = min_num_segments;
}

/// Set the maximum number docs in a segment for it to be considered for
/// merging.
pub fn set_max_merge_size(&mut self, max_merge_size: usize) {
self.max_merge_size = max_merge_size;
/// merging. A segment can still reach more than max_docs, by merging many
/// smaller ones.
pub fn set_max_docs_before_merge(&mut self, max_docs_merge_size: usize) {
self.max_docs_before_merge = max_docs_merge_size;
}

/// Set the minimum segment size under which all segment belong
Expand All @@ -42,7 +44,7 @@ impl LogMergePolicy {

/// Set the ratio between two consecutive levels.
///
/// Segment are group in levels according to their sizes.
/// Segments are grouped in levels according to their sizes.
/// These levels are defined as intervals of exponentially growing sizes.
/// level_log_size define the factor by which one should multiply the limit
/// to reach a level, in order to get the limit to reach the following
Expand All @@ -54,52 +56,43 @@ impl LogMergePolicy {

impl MergePolicy for LogMergePolicy {
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate> {
let mut size_sorted_tuples = segments
let mut size_sorted_segments = segments
.iter()
.map(SegmentMeta::num_docs)
.enumerate()
.filter(|(_, s)| s <= &(self.max_merge_size as u32))
.collect::<Vec<(usize, u32)>>();
.filter(|segment_meta| segment_meta.num_docs() <= (self.max_docs_before_merge as u32))
.collect::<Vec<&SegmentMeta>>();

size_sorted_tuples.sort_by(|x, y| y.1.cmp(&(x.1)));

if size_sorted_tuples.len() <= 1 {
return Vec::new();
if size_sorted_segments.len() <= 1 {
return vec![];
}
size_sorted_segments.sort_by_key(|seg| std::cmp::Reverse(seg.num_docs()));

let size_sorted_log_tuples: Vec<_> = size_sorted_tuples
.into_iter()
.map(|(ind, num_docs)| (ind, f64::from(self.clip_min_size(num_docs)).log2()))
.collect();

if let Some(&(first_ind, first_score)) = size_sorted_log_tuples.first() {
let mut current_max_log_size = first_score;
let mut levels = vec![vec![first_ind]];
for &(ind, score) in (&size_sorted_log_tuples).iter().skip(1) {
if score < (current_max_log_size - self.level_log_size) {
current_max_log_size = score;
levels.push(Vec::new());
}
levels.last_mut().unwrap().push(ind);
let mut current_max_log_size = f64::MAX;
let mut levels = vec![];
for (_, merge_group) in &size_sorted_segments.into_iter().group_by(|segment| {
let segment_log_size = f64::from(self.clip_min_size(segment.num_docs())).log2();
if segment_log_size < (current_max_log_size - self.level_log_size) {
// update current_max_log_size to create a new group
current_max_log_size = segment_log_size;
}
levels
.iter()
.filter(|level| level.len() >= self.min_merge_size)
.map(|ind_vec| {
MergeCandidate(ind_vec.iter().map(|&ind| segments[ind].id()).collect())
})
.collect()
} else {
return vec![];
// return current_max_log_size to be grouped to the current group
current_max_log_size
}) {
levels.push(merge_group.collect::<Vec<&SegmentMeta>>());
}

levels
.iter()
.filter(|level| level.len() >= self.min_num_segments)
.map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect()))
.collect()
}
}

impl Default for LogMergePolicy {
fn default() -> LogMergePolicy {
LogMergePolicy {
min_merge_size: DEFAULT_MIN_MERGE_SIZE,
max_merge_size: DEFAULT_MAX_MERGE_SIZE,
min_num_segments: DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE,
max_docs_before_merge: DEFAULT_MAX_DOCS_BEFORE_MERGE,
min_layer_size: DEFAULT_MIN_LAYER_SIZE,
level_log_size: DEFAULT_LEVEL_LOG_SIZE,
}
Expand All @@ -109,16 +102,79 @@ impl Default for LogMergePolicy {
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::merge_policy::MergePolicy;
use crate::{
core::{SegmentId, SegmentMeta, SegmentMetaInventory},
schema,
};
use crate::{indexer::merge_policy::MergePolicy, schema::INDEXED};
use once_cell::sync::Lazy;

static INVENTORY: Lazy<SegmentMetaInventory> = Lazy::new(SegmentMetaInventory::default);

use crate::Index;

#[test]
fn create_index_test_max_merge_issue_1035() {
let mut schema_builder = schema::Schema::builder();
let int_field = schema_builder.add_u64_field("intval", INDEXED);
let schema = schema_builder.build();

let index = Index::create_in_ram(schema);

{
let mut log_merge_policy = LogMergePolicy::default();
log_merge_policy.set_min_num_segments(1);
log_merge_policy.set_max_docs_before_merge(1);
log_merge_policy.set_min_layer_size(0);

let mut index_writer = index.writer_for_tests().unwrap();
index_writer.set_merge_policy(Box::new(log_merge_policy));

// after every commit the merge checker is started, it will merge only segments with 1
// element in it because of the max_merge_size.
index_writer.add_document(doc!(int_field=>1_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>2_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>3_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>4_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>5_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>6_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>7_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>8_u64));
assert!(index_writer.commit().is_ok());
}

let _segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");

let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_readers = searcher.segment_readers();
for segment in segment_readers {
if segment.num_docs() > 2 {
panic!("segment can't have more than two segments");
} // don't know how to wait for the merge, then it could be a simple eq
}
}

fn test_merge_policy() -> LogMergePolicy {
let mut log_merge_policy = LogMergePolicy::default();
log_merge_policy.set_min_merge_size(3);
log_merge_policy.set_max_merge_size(100_000);
log_merge_policy.set_min_num_segments(3);
log_merge_policy.set_max_docs_before_merge(100_000);
log_merge_policy.set_min_layer_size(2);
log_merge_policy
}
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,7 @@ mod tests {

// Make sure we'll attempt to merge every created segment
let mut policy = crate::indexer::LogMergePolicy::default();
policy.set_min_merge_size(2);
policy.set_min_num_segments(2);
writer.set_merge_policy(Box::new(policy));

for i in 0..100 {
Expand Down

0 comments on commit bcd72e5

Please sign in to comment.