From 6ba302c4819bd36ce40b43c6777f813b9f048030 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 23 Jun 2021 19:59:39 +0200 Subject: [PATCH] Use dynamic fastfield codes for multivalues fixes #1093 Use dynamic fastfield codes for multivalues fixes (only sorting case covered) Rename get to get_val due to conflict with Vec use u64 precision instead of u32 for get_range, to allow use of existing fast field interface interface (actually not sure if it would be better have a different interface) --- fastfield_codecs/src/lib.rs | 18 ++--- fastfield_codecs/src/linearinterpol.rs | 10 +-- fastfield_codecs/src/multilinearinterpol.rs | 11 +-- src/fastfield/multivalued/reader.rs | 14 ++-- src/fastfield/reader.rs | 8 +-- src/fastfield/readers.rs | 15 ++-- src/fastfield/serializer/mod.rs | 21 +++++- src/fastfield/writer.rs | 12 ++-- src/indexer/merger.rs | 79 +++++++++++++++++---- 9 files changed, 132 insertions(+), 56 deletions(-) diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index a79ebc3711..a5d67304e8 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -51,14 +51,14 @@ pub trait FastFieldCodecSerializer { /// FastFieldDataAccess is the trait to access fast field data during serialization and estimation. pub trait FastFieldDataAccess { - /// Return the value associated to the given document. + /// Return the value associated to the given position. /// /// Whenever possible use the Iterator passed to the fastfield creation instead, for performance reasons. /// /// # Panics /// - /// May panic if `doc` is greater than the segment - fn get(&self, doc: u32) -> u64; + /// May panic if `position` is greater than the index. + fn get_val(&self, position: u64) -> u64; } #[derive(Debug, Clone)] @@ -69,20 +69,20 @@ pub struct FastFieldStats { } impl<'a> FastFieldDataAccess for &'a [u64] { - fn get(&self, doc: u32) -> u64 { - self[doc as usize] + fn get_val(&self, position: u64) -> u64 { + self[position as usize] } } impl<'a> FastFieldDataAccess for &'a Vec { - fn get(&self, doc: u32) -> u64 { - self[doc as usize] + fn get_val(&self, position: u64) -> u64 { + self[position as usize] } } impl FastFieldDataAccess for Vec { - fn get(&self, doc: u32) -> u64 { - self[doc as usize] + fn get_val(&self, position: u64) -> u64 { + self[position as usize] } } diff --git a/fastfield_codecs/src/linearinterpol.rs b/fastfield_codecs/src/linearinterpol.rs index a9acaa3f7b..3a80656a16 100644 --- a/fastfield_codecs/src/linearinterpol.rs +++ b/fastfield_codecs/src/linearinterpol.rs @@ -123,8 +123,8 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer { ) -> io::Result<()> { assert!(stats.min_value <= stats.max_value); - let first_val = fastfield_accessor.get(0); - let last_val = fastfield_accessor.get(stats.num_vals as u32 - 1); + let first_val = fastfield_accessor.get_val(0); + let last_val = fastfield_accessor.get_val(stats.num_vals as u64 - 1); let slope = get_slope(first_val, last_val, stats.num_vals); // calculate offset to ensure all values are positive let mut offset = 0; @@ -191,8 +191,8 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer { /// where the local maxima for the deviation of the calculated value are and /// the offset to shift all values to >=0 is also unknown. fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32 { - let first_val = fastfield_accessor.get(0); - let last_val = fastfield_accessor.get(stats.num_vals as u32 - 1); + let first_val = fastfield_accessor.get_val(0); + let last_val = fastfield_accessor.get_val(stats.num_vals as u64 - 1); let slope = get_slope(first_val, last_val, stats.num_vals); // let's sample at 0%, 5%, 10% .. 95%, 100% @@ -205,7 +205,7 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer { .iter() .map(|pos| { let calculated_value = get_calculated_value(first_val, *pos as u64, slope); - let actual_value = fastfield_accessor.get(*pos as u32); + let actual_value = fastfield_accessor.get_val(*pos as u64); distance(calculated_value, actual_value) }) .max() diff --git a/fastfield_codecs/src/multilinearinterpol.rs b/fastfield_codecs/src/multilinearinterpol.rs index 811b5fbdf2..45dfde5c24 100644 --- a/fastfield_codecs/src/multilinearinterpol.rs +++ b/fastfield_codecs/src/multilinearinterpol.rs @@ -196,8 +196,8 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer { ) -> io::Result<()> { assert!(stats.min_value <= stats.max_value); - let first_val = fastfield_accessor.get(0); - let last_val = fastfield_accessor.get(stats.num_vals as u32 - 1); + let first_val = fastfield_accessor.get_val(0); + let last_val = fastfield_accessor.get_val(stats.num_vals as u64 - 1); let mut first_function = Function { end_pos: stats.num_vals, @@ -309,9 +309,10 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer { /// where the local maxima are for the deviation of the calculated value and /// the offset is also unknown. fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32 { - let first_val_in_first_block = fastfield_accessor.get(0); + let first_val_in_first_block = fastfield_accessor.get_val(0); let last_elem_in_first_chunk = CHUNK_SIZE.min(stats.num_vals); - let last_val_in_first_block = fastfield_accessor.get(last_elem_in_first_chunk as u32 - 1); + let last_val_in_first_block = + fastfield_accessor.get_val(last_elem_in_first_chunk as u64 - 1); let slope = get_slope( first_val_in_first_block, last_val_in_first_block, @@ -328,7 +329,7 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer { .map(|pos| { let calculated_value = get_calculated_value(first_val_in_first_block, *pos as u64, slope); - let actual_value = fastfield_accessor.get(*pos as u32); + let actual_value = fastfield_accessor.get_val(*pos as u64); distance(calculated_value, actual_value) }) .max() diff --git a/src/fastfield/multivalued/reader.rs b/src/fastfield/multivalued/reader.rs index ad6c99da66..60691a76f9 100644 --- a/src/fastfield/multivalued/reader.rs +++ b/src/fastfield/multivalued/reader.rs @@ -1,8 +1,6 @@ use std::ops::Range; -use crate::fastfield::{ - BitpackedFastFieldReader, DynamicFastFieldReader, FastFieldReader, FastValue, MultiValueLength, -}; +use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, FastValue, MultiValueLength}; use crate::DocId; /// Reader for a multivalued `u64` fast field. @@ -16,13 +14,13 @@ use crate::DocId; #[derive(Clone)] pub struct MultiValuedFastFieldReader { idx_reader: DynamicFastFieldReader, - vals_reader: BitpackedFastFieldReader, + vals_reader: DynamicFastFieldReader, } impl MultiValuedFastFieldReader { pub(crate) fn open( idx_reader: DynamicFastFieldReader, - vals_reader: BitpackedFastFieldReader, + vals_reader: DynamicFastFieldReader, ) -> MultiValuedFastFieldReader { MultiValuedFastFieldReader { idx_reader, @@ -32,6 +30,7 @@ impl MultiValuedFastFieldReader { /// Returns `(start, stop)`, such that the values associated /// to the given document are `start..stop`. + #[inline] fn range(&self, doc: DocId) -> Range { let start = self.idx_reader.get(doc); let stop = self.idx_reader.get(doc + 1); @@ -39,11 +38,12 @@ impl MultiValuedFastFieldReader { } /// Returns the array of values associated to the given `doc`. + #[inline] pub fn get_vals(&self, doc: DocId, vals: &mut Vec) { let range = self.range(doc); let len = (range.end - range.start) as usize; vals.resize(len, Item::make_zero()); - self.vals_reader.get_range_u64(range.start, &mut vals[..]); + self.vals_reader.get_range(range.start, &mut vals[..]); } /// Returns the minimum value for this fast field. @@ -65,12 +65,14 @@ impl MultiValuedFastFieldReader { } /// Returns the number of values associated with the document `DocId`. + #[inline] pub fn num_vals(&self, doc: DocId) -> usize { let range = self.range(doc); (range.end - range.start) as usize } /// Returns the overall number of values in this field . + #[inline] pub fn total_num_vals(&self) -> u64 { self.idx_reader.max_value() } diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 30ae0aba88..47adcddfb7 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -44,7 +44,7 @@ pub trait FastFieldReader: Clone { /// /// May panic if `start + output.len()` is greater than /// the segment's `maxdoc`. - fn get_range(&self, start: DocId, output: &mut [Item]); + fn get_range(&self, start: u64, output: &mut [Item]); /// Returns the minimum value for this fast field. /// @@ -120,7 +120,7 @@ impl FastFieldReader for DynamicFastFieldReader { Self::MultiLinearInterpol(reader) => reader.get(doc), } } - fn get_range(&self, start: DocId, output: &mut [Item]) { + fn get_range(&self, start: u64, output: &mut [Item]) { match self { Self::Bitpacked(reader) => reader.get_range(start, output), Self::LinearInterpol(reader) => reader.get_range(start, output), @@ -226,8 +226,8 @@ impl FastFieldReader /// /// May panic if `start + output.len()` is greater than /// the segment's `maxdoc`. - fn get_range(&self, start: DocId, output: &mut [Item]) { - self.get_range_u64(u64::from(start), output); + fn get_range(&self, start: u64, output: &mut [Item]) { + self.get_range_u64(start, output); } /// Returns the minimum value for this fast field. diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index ccd91b61fe..f202bbb35a 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -99,22 +99,27 @@ impl FastFieldReaders { Ok(()) } - pub(crate) fn typed_fast_field_reader( + pub(crate) fn typed_fast_field_reader_with_idx( &self, field: Field, + index: usize, ) -> crate::Result> { - let fast_field_slice = self.fast_field_data(field, 0)?; + let fast_field_slice = self.fast_field_data(field, index)?; DynamicFastFieldReader::open(fast_field_slice) } + pub(crate) fn typed_fast_field_reader( + &self, + field: Field, + ) -> crate::Result> { + self.typed_fast_field_reader_with_idx(field, 0) + } pub(crate) fn typed_fast_field_multi_reader( &self, field: Field, ) -> crate::Result> { let idx_reader = self.typed_fast_field_reader(field)?; - let fast_field_slice_vals = self.fast_field_data(field, 1)?; - let vals_reader: BitpackedFastFieldReader = - BitpackedFastFieldReader::open(fast_field_slice_vals)?; + let vals_reader = self.typed_fast_field_reader_with_idx(field, 1)?; Ok(MultiValuedFastFieldReader::open(idx_reader, vals_reader)) } diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index d297c65ff0..a4c9fc0900 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -71,7 +71,26 @@ impl CompositeFastFieldSerializer { data_iter_1: impl Iterator, data_iter_2: impl Iterator, ) -> io::Result<()> { - let field_write = self.composite_write.for_field_with_idx(field, 0); + self.create_auto_detect_u64_fast_field_with_idx( + field, + stats, + fastfield_accessor, + data_iter_1, + data_iter_2, + 0, + ) + } + /// Serialize data into a new u64 fast field. The best compression codec will be chosen automatically. + pub fn create_auto_detect_u64_fast_field_with_idx( + &mut self, + field: Field, + stats: FastFieldStats, + fastfield_accessor: impl FastFieldDataAccess, + data_iter_1: impl Iterator, + data_iter_2: impl Iterator, + idx: usize, + ) -> io::Result<()> { + let field_write = self.composite_write.for_field_with_idx(field, idx); let mut estimations = vec![]; diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index a3b3bdb79b..9e98934540 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -7,7 +7,6 @@ use crate::indexer::doc_id_mapping::DocIdMapping; use crate::postings::UnorderedTermId; use crate::schema::{Cardinality, Document, Field, FieldEntry, FieldType, Schema}; use crate::termdict::TermOrdinal; -use crate::DocId; use fnv::FnvHashMap; use std::collections::HashMap; use std::io; @@ -323,16 +322,17 @@ struct WriterFastFieldAccessProvider<'map, 'bitp> { vals: &'bitp BlockedBitpacker, } impl<'map, 'bitp> FastFieldDataAccess for WriterFastFieldAccessProvider<'map, 'bitp> { - /// Return the value associated to the given document. + /// Return the value associated to the given doc. /// - /// This accessor should return as fast as possible. + /// Whenever possible use the Iterator passed to the fastfield creation instead, for performance reasons. /// /// # Panics /// - /// May panic if `doc` is greater than the segment - fn get(&self, doc: DocId) -> u64 { + /// May panic if `doc` is greater than the index. + fn get_val(&self, doc: u64) -> u64 { if let Some(doc_id_map) = self.doc_id_map { - self.vals.get(doc_id_map.get_old_doc_id(doc) as usize) // consider extra FastFieldReader wrapper for non doc_id_map + self.vals + .get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra FastFieldReader wrapper for non doc_id_map } else { self.vals.get(doc as usize) } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 5810a6e554..f89f7f60fc 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -350,7 +350,7 @@ impl IndexMerger { fast_field_readers: &'a Vec>, } impl<'a> FastFieldDataAccess for SortedDocidFieldAccessProvider<'a> { - fn get(&self, doc: DocId) -> u64 { + fn get_val(&self, doc: u64) -> u64 { let (doc_id, reader_with_ordinal) = self.doc_id_mapping[doc as usize]; self.fast_field_readers[reader_with_ordinal.ordinal as usize].get(doc_id) } @@ -518,7 +518,7 @@ impl IndexMerger { fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &Option>, reader_and_field_accessors: &[(&SegmentReader, T)], - ) -> crate::Result<()> { + ) -> crate::Result> { let mut total_num_vals = 0u64; // In the first pass, we compute the total number of vals. // @@ -569,6 +569,7 @@ impl IndexMerger { offsets.iter().cloned(), offsets.iter().cloned(), )?; + Ok(offsets) } else { let mut offsets = vec![]; let mut offset = 0; @@ -587,16 +588,15 @@ impl IndexMerger { offsets.iter().cloned(), offsets.iter().cloned(), )?; + Ok(offsets) } - - Ok(()) } fn write_multi_value_fast_field_idx( &self, field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &Option>, - ) -> crate::Result<()> { + ) -> crate::Result> { let reader_and_field_accessors = self.readers.iter().map(|reader|{ let u64s_reader: MultiValuedFastFieldReader = reader.fast_fields() .typed_fast_field_multi_reader(field) @@ -687,10 +687,12 @@ impl IndexMerger { // The second contains the actual values. // First we merge the idx fast field. - self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?; + let offsets = + self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?; let mut min_value = u64::max_value(); let mut max_value = u64::min_value(); + let mut num_vals = 0; let mut vals = Vec::with_capacity(100); @@ -716,6 +718,7 @@ impl IndexMerger { min_value = cmp::min(val, min_value); max_value = cmp::max(val, max_value); } + num_vals += vals.len(); } ff_readers.push(ff_reader); // TODO optimize when no deletes @@ -726,7 +729,7 @@ impl IndexMerger { max_value = 0; } - let fast_field_reader = self + let fast_field_readers = self .readers .iter() .map(|reader| { @@ -738,17 +741,63 @@ impl IndexMerger { .collect::>(); // We can now initialize our serializer, and push it the different values - let mut serialize_vals = - fast_field_serializer.new_u64_fast_field_with_idx(field, min_value, max_value, 1)?; + let stats = FastFieldStats { + max_value, + num_vals: num_vals as u64, + min_value, + }; if let Some(doc_id_mapping) = doc_id_mapping { - for (doc_id, reader_with_ordinal) in doc_id_mapping { - let ff_reader = &fast_field_reader[reader_with_ordinal.ordinal as usize]; - ff_reader.get_vals(*doc_id, &mut vals); - for &val in &vals { - serialize_vals.add_val(val)?; + struct SortedDocidFieldAccessProvider<'a> { + doc_id_mapping: &'a Vec<(DocId, SegmentReaderWithOrdinal<'a>)>, + fast_field_readers: &'a Vec>, + offsets: Vec, + } + impl<'a> FastFieldDataAccess for SortedDocidFieldAccessProvider<'a> { + fn get_val(&self, pos: u64) -> u64 { + // use the offsets index to find the doc_id which will contain the position. + // the offsets are stricly increasing so we can do a simple search on it. + let new_docid = self.offsets.iter().position(|&x| x > pos).unwrap() - 1; + + // now we need to find the position of `pos` in the multivalued bucket + let num_pos_covered_until_now = self.offsets[new_docid]; + let pos_in_values = pos - num_pos_covered_until_now; + + let (old_doc_id, reader_with_ordinal) = self.doc_id_mapping[new_docid as usize]; + let num_vals = self.fast_field_readers[reader_with_ordinal.ordinal as usize] + .get_len(old_doc_id); + assert!(num_vals >= pos_in_values); + let mut vals = vec![]; + self.fast_field_readers[reader_with_ordinal.ordinal as usize] + .get_vals(old_doc_id, &mut vals); + + vals[pos_in_values as usize] } } + let fastfield_accessor = SortedDocidFieldAccessProvider { + doc_id_mapping, + fast_field_readers: &fast_field_readers, + offsets, + }; + let iter = doc_id_mapping + .iter() + .map(|(doc_id, reader_with_ordinal)| { + let ff_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize]; + let mut vals = vec![]; + ff_reader.get_vals(*doc_id, &mut vals); + vals.into_iter() + }) + .flatten(); + fast_field_serializer.create_auto_detect_u64_fast_field_with_idx( + field, + stats, + fastfield_accessor, + iter.clone(), + iter, + 1, + )?; } else { + let mut serialize_vals = fast_field_serializer + .new_u64_fast_field_with_idx(field, min_value, max_value, 1)?; for (reader, ff_reader) in self.readers.iter().zip(ff_readers) { // TODO optimize if no deletes for doc in reader.doc_ids_alive() { @@ -758,8 +807,8 @@ impl IndexMerger { } } } + serialize_vals.close_field()?; } - serialize_vals.close_field()?; Ok(()) }