From 93997a8b8e66da7a1a62411eda4a845f6c0ec781 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 18 May 2021 09:55:31 +0200 Subject: [PATCH] use itertools groupby, fix serialization test --- src/indexer/index_writer.rs | 2 +- src/indexer/log_merge_policy.rs | 41 ++++++++++++++++----------------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index dee4fefbc6..718f5203f1 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -945,7 +945,7 @@ mod tests { let index_writer = index.writer(3_000_000).unwrap(); assert_eq!( format!("{:?}", index_writer.get_merge_policy()), - "LogMergePolicy { min_merge_size: 8, max_merge_size: 10000000, min_layer_size: 10000, \ + "LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, min_layer_size: 10000, \ level_log_size: 0.75 }" ); let merge_policy = Box::new(NoMergePolicy::default()); diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 12b7b3edf4..68345aee78 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -1,5 +1,6 @@ use super::merge_policy::{MergeCandidate, MergePolicy}; use crate::core::SegmentMeta; +use itertools::Itertools; use std::cmp; use std::f64; @@ -65,29 +66,27 @@ impl MergePolicy for LogMergePolicy { } size_sorted_segments.sort_by_key(|seg| std::cmp::Reverse(seg.num_docs())); - let sorted_segments_with_log_size: Vec<_> = size_sorted_segments - .into_iter() - .map(|seg| (seg, f64::from(self.clip_min_size(seg.num_docs())).log2())) - .collect(); - - if let Some(&(largest_segment, log_size)) = sorted_segments_with_log_size.first() { - let mut current_max_log_size = log_size; - let mut levels = vec![vec![largest_segment]]; - for &(segment, segment_log_size) in sorted_segments_with_log_size.iter().skip(1) { - if segment_log_size < (current_max_log_size - self.level_log_size) { - current_max_log_size = segment_log_size; - levels.push(Vec::new()); - } - levels.last_mut().unwrap().push(segment); + let mut current_max_log_size = f64::MAX; + let mut levels = vec![]; + for (_, merge_group) in &size_sorted_segments.into_iter().group_by(|segment| { + let segment_log_size = f64::from(self.clip_min_size(segment.num_docs())).log2(); + if segment_log_size < (current_max_log_size - self.level_log_size) { + // update current_max_log_size to create a new group + current_max_log_size = segment_log_size; + current_max_log_size + } else { + // return current_max_log_size to be grouped to the current group + current_max_log_size } - levels - .iter() - .filter(|level| level.len() >= self.min_num_segments) - .map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect())) - .collect() - } else { - return vec![]; + }) { + levels.push(merge_group.collect::>()); } + + levels + .iter() + .filter(|level| level.len() >= self.min_num_segments) + .map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect())) + .collect() } }