Skip to content

Commit

Permalink
use itertools groupby, fix serialization test
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed May 18, 2021
1 parent 6473a8c commit 93997a8
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ mod tests {
let index_writer = index.writer(3_000_000).unwrap();
assert_eq!(
format!("{:?}", index_writer.get_merge_policy()),
"LogMergePolicy { min_merge_size: 8, max_merge_size: 10000000, min_layer_size: 10000, \
"LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, min_layer_size: 10000, \
level_log_size: 0.75 }"
);
let merge_policy = Box::new(NoMergePolicy::default());
Expand Down
41 changes: 20 additions & 21 deletions src/indexer/log_merge_policy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::merge_policy::{MergeCandidate, MergePolicy};
use crate::core::SegmentMeta;
use itertools::Itertools;
use std::cmp;
use std::f64;

Expand Down Expand Up @@ -65,29 +66,27 @@ impl MergePolicy for LogMergePolicy {
}
size_sorted_segments.sort_by_key(|seg| std::cmp::Reverse(seg.num_docs()));

let sorted_segments_with_log_size: Vec<_> = size_sorted_segments
.into_iter()
.map(|seg| (seg, f64::from(self.clip_min_size(seg.num_docs())).log2()))
.collect();

if let Some(&(largest_segment, log_size)) = sorted_segments_with_log_size.first() {
let mut current_max_log_size = log_size;
let mut levels = vec![vec![largest_segment]];
for &(segment, segment_log_size) in sorted_segments_with_log_size.iter().skip(1) {
if segment_log_size < (current_max_log_size - self.level_log_size) {
current_max_log_size = segment_log_size;
levels.push(Vec::new());
}
levels.last_mut().unwrap().push(segment);
let mut current_max_log_size = f64::MAX;
let mut levels = vec![];
for (_, merge_group) in &size_sorted_segments.into_iter().group_by(|segment| {
let segment_log_size = f64::from(self.clip_min_size(segment.num_docs())).log2();
if segment_log_size < (current_max_log_size - self.level_log_size) {
// update current_max_log_size to create a new group
current_max_log_size = segment_log_size;
current_max_log_size
} else {
// return current_max_log_size to be grouped to the current group
current_max_log_size
}
levels
.iter()
.filter(|level| level.len() >= self.min_num_segments)
.map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect()))
.collect()
} else {
return vec![];
}) {
levels.push(merge_group.collect::<Vec<&SegmentMeta>>());
}

levels
.iter()
.filter(|level| level.len() >= self.min_num_segments)
.map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect()))
.collect()
}
}

Expand Down

0 comments on commit 93997a8

Please sign in to comment.