Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix and refactor log merge policy, fixes #1035 #1043

Merged
merged 6 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/directory/mmap_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ mod tests {

let mut index_writer = index.writer_for_tests().unwrap();
let mut log_merge_policy = LogMergePolicy::default();
log_merge_policy.set_min_merge_size(3);
log_merge_policy.set_min_num_segments(3);
index_writer.set_merge_policy(Box::new(log_merge_policy));
for _num_commits in 0..10 {
for _ in 0..10 {
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ mod tests {
let index_writer = index.writer(3_000_000).unwrap();
assert_eq!(
format!("{:?}", index_writer.get_merge_policy()),
"LogMergePolicy { min_merge_size: 8, max_merge_size: 10000000, min_layer_size: 10000, \
"LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, min_layer_size: 10000, \
level_log_size: 0.75 }"
);
let merge_policy = Box::new(NoMergePolicy::default());
Expand Down
154 changes: 105 additions & 49 deletions src/indexer/log_merge_policy.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use super::merge_policy::{MergeCandidate, MergePolicy};
use crate::core::SegmentMeta;
use itertools::Itertools;
use std::cmp;
use std::f64;

const DEFAULT_LEVEL_LOG_SIZE: f64 = 0.75;
const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
const DEFAULT_MIN_MERGE_SIZE: usize = 8;
const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000;
const DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE: usize = 8;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

const DEFAULT_MAX_DOCS_BEFORE_MERGE: usize = 10_000_000;

/// `LogMergePolicy` tries to merge segments that have a similar number of
/// documents.
#[derive(Debug, Clone)]
pub struct LogMergePolicy {
min_merge_size: usize,
max_merge_size: usize,
min_num_segments: usize,
max_docs_before_merge: usize,
min_layer_size: u32,
level_log_size: f64,
}
Expand All @@ -23,15 +24,16 @@ impl LogMergePolicy {
cmp::max(self.min_layer_size, size)
}

/// Set the minimum number of segment that may be merge together.
pub fn set_min_merge_size(&mut self, min_merge_size: usize) {
self.min_merge_size = min_merge_size;
/// Set the minimum number of segments that may be merged together.
pub fn set_min_num_segments(&mut self, min_num_segments: usize) {
self.min_num_segments = min_num_segments;
}

/// Set the maximum number docs in a segment for it to be considered for
/// merging.
pub fn set_max_merge_size(&mut self, max_merge_size: usize) {
self.max_merge_size = max_merge_size;
/// merging. A segment can still reach more than max_docs, by merging many
/// smaller ones.
pub fn set_max_docs_before_merge(&mut self, max_docs_merge_size: usize) {
self.max_docs_before_merge = max_docs_merge_size;
}

/// Set the minimum segment size under which all segment belong
Expand All @@ -42,7 +44,7 @@ impl LogMergePolicy {

/// Set the ratio between two consecutive levels.
///
/// Segment are group in levels according to their sizes.
/// Segments are grouped in levels according to their sizes.
/// These levels are defined as intervals of exponentially growing sizes.
/// level_log_size define the factor by which one should multiply the limit
/// to reach a level, in order to get the limit to reach the following
Expand All @@ -54,52 +56,43 @@ impl LogMergePolicy {

impl MergePolicy for LogMergePolicy {
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate> {
let mut size_sorted_tuples = segments
let mut size_sorted_segments = segments
.iter()
.map(SegmentMeta::num_docs)
.enumerate()
.filter(|(_, s)| s <= &(self.max_merge_size as u32))
.collect::<Vec<(usize, u32)>>();
.filter(|segment_meta| segment_meta.num_docs() <= (self.max_docs_before_merge as u32))
.collect::<Vec<&SegmentMeta>>();

size_sorted_tuples.sort_by(|x, y| y.1.cmp(&(x.1)));

if size_sorted_tuples.len() <= 1 {
return Vec::new();
if size_sorted_segments.len() <= 1 {
return vec![];
}
size_sorted_segments.sort_by_key(|seg| std::cmp::Reverse(seg.num_docs()));

let size_sorted_log_tuples: Vec<_> = size_sorted_tuples
.into_iter()
.map(|(ind, num_docs)| (ind, f64::from(self.clip_min_size(num_docs)).log2()))
.collect();

if let Some(&(first_ind, first_score)) = size_sorted_log_tuples.first() {
let mut current_max_log_size = first_score;
let mut levels = vec![vec![first_ind]];
for &(ind, score) in (&size_sorted_log_tuples).iter().skip(1) {
if score < (current_max_log_size - self.level_log_size) {
current_max_log_size = score;
levels.push(Vec::new());
}
levels.last_mut().unwrap().push(ind);
let mut current_max_log_size = f64::MAX;
let mut levels = vec![];
for (_, merge_group) in &size_sorted_segments.into_iter().group_by(|segment| {
let segment_log_size = f64::from(self.clip_min_size(segment.num_docs())).log2();
if segment_log_size < (current_max_log_size - self.level_log_size) {
// update current_max_log_size to create a new group
current_max_log_size = segment_log_size;
}
levels
.iter()
.filter(|level| level.len() >= self.min_merge_size)
.map(|ind_vec| {
MergeCandidate(ind_vec.iter().map(|&ind| segments[ind].id()).collect())
})
.collect()
} else {
return vec![];
// return current_max_log_size to be grouped to the current group
current_max_log_size
}) {
levels.push(merge_group.collect::<Vec<&SegmentMeta>>());
}

levels
.iter()
.filter(|level| level.len() >= self.min_num_segments)
.map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect()))
.collect()
}
}

impl Default for LogMergePolicy {
fn default() -> LogMergePolicy {
LogMergePolicy {
min_merge_size: DEFAULT_MIN_MERGE_SIZE,
max_merge_size: DEFAULT_MAX_MERGE_SIZE,
min_num_segments: DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE,
max_docs_before_merge: DEFAULT_MAX_DOCS_BEFORE_MERGE,
min_layer_size: DEFAULT_MIN_LAYER_SIZE,
level_log_size: DEFAULT_LEVEL_LOG_SIZE,
}
Expand All @@ -109,16 +102,79 @@ impl Default for LogMergePolicy {
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::merge_policy::MergePolicy;
use crate::{
core::{SegmentId, SegmentMeta, SegmentMetaInventory},
schema,
};
use crate::{indexer::merge_policy::MergePolicy, schema::INDEXED};
use once_cell::sync::Lazy;

static INVENTORY: Lazy<SegmentMetaInventory> = Lazy::new(SegmentMetaInventory::default);

use crate::Index;

#[test]
fn create_index_test_max_merge_issue_1035() {
let mut schema_builder = schema::Schema::builder();
let int_field = schema_builder.add_u64_field("intval", INDEXED);
let schema = schema_builder.build();

let index = Index::create_in_ram(schema);

{
let mut log_merge_policy = LogMergePolicy::default();
log_merge_policy.set_min_num_segments(1);
log_merge_policy.set_max_docs_before_merge(1);
log_merge_policy.set_min_layer_size(0);

let mut index_writer = index.writer_for_tests().unwrap();
index_writer.set_merge_policy(Box::new(log_merge_policy));

// after every commit the merge checker is started, it will merge only segments with 1
// element in it because of the max_merge_size.
index_writer.add_document(doc!(int_field=>1_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>2_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>3_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>4_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>5_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>6_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>7_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>8_u64));
assert!(index_writer.commit().is_ok());
}

let _segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");

let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_readers = searcher.segment_readers();
for segment in segment_readers {
if segment.num_docs() > 2 {
panic!("segment can't have more than two segments");
} // don't know how to wait for the merge, then it could be a simple eq
}
}

fn test_merge_policy() -> LogMergePolicy {
let mut log_merge_policy = LogMergePolicy::default();
log_merge_policy.set_min_merge_size(3);
log_merge_policy.set_max_merge_size(100_000);
log_merge_policy.set_min_num_segments(3);
log_merge_policy.set_max_docs_before_merge(100_000);
log_merge_policy.set_min_layer_size(2);
log_merge_policy
}
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,7 @@ mod tests {

// Make sure we'll attempt to merge every created segment
let mut policy = crate::indexer::LogMergePolicy::default();
policy.set_min_merge_size(2);
policy.set_min_num_segments(2);
writer.set_merge_policy(Box::new(policy));

for i in 0..100 {
Expand Down