From 2dc5403e7b8e1cbb166582114414161694d1a1e6 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 23 Apr 2021 16:34:15 +0900 Subject: [PATCH] Closes #1022 --- CHANGELOG.md | 1 + src/core/inverted_index_reader.rs | 2 +- src/directory/mmap_directory.rs | 3 +- src/indexer/merger.rs | 2 +- src/lib.rs | 2 +- src/positions/mod.rs | 205 +++++++++++-------- src/positions/reader.rs | 56 +++-- src/positions/serializer.rs | 35 +++- src/postings/compression/mod.rs | 7 + src/postings/compression/vint.rs | 34 ++- src/postings/mod.rs | 2 +- src/postings/postings_writer.rs | 2 +- src/postings/recorder.rs | 19 +- src/postings/serializer.rs | 29 +-- src/termdict/fst_termdict/term_info_store.rs | 2 +- src/termdict/tests.rs | 2 +- 16 files changed, 257 insertions(+), 146 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ab6477852a..164da9cb87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Tantivy 0.15.0 - Date field support for range queries (@rihardsk) #516 - Added lz4-flex as the default compression scheme in tantivy (@PSeitz) #1009 - Renamed a lot of symbols to avoid all uppercasing on acronyms, as per new clippy recommendation. For instance, RAMDireotory -> RamDirectory. (@pmasurel) +- Simplified positions index format (@fulmicoton) #1022 Tantivy 0.14.0 ========================= diff --git a/src/core/inverted_index_reader.rs b/src/core/inverted_index_reader.rs index 2f9515ca82..3371de1c3d 100644 --- a/src/core/inverted_index_reader.rs +++ b/src/core/inverted_index_reader.rs @@ -142,7 +142,7 @@ impl InvertedIndexReader { let positions_data = self .positions_file_slice .read_bytes_slice(term_info.positions_range.clone())?; - let position_reader = PositionReader::new(positions_data)?; + let position_reader = PositionReader::open(positions_data)?; Some(position_reader) } else { None diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 348b0b5cb1..9ce7668337 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -614,8 +614,9 @@ mod tests { reader.reload().unwrap(); let num_segments = reader.searcher().segment_readers().len(); assert!(num_segments <= 4); + let num_components_except_deletes = crate::core::SegmentComponent::iterator().len() - 1; assert_eq!( - num_segments * 7, + num_segments * num_components_except_deletes, mmap_directory.get_cache_info().mmapped.len() ); } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index e6d3be36bc..93099b50ce 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -628,7 +628,7 @@ impl IndexMerger { segment_postings.positions(&mut positions_buffer); let delta_positions = delta_computer.compute_delta(&positions_buffer); - field_serializer.write_doc(remapped_doc_id, term_freq, delta_positions)?; + field_serializer.write_doc(remapped_doc_id, term_freq, delta_positions); } doc = segment_postings.advance(); diff --git a/src/lib.rs b/src/lib.rs index 58833a3a25..1653c756e9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -141,7 +141,7 @@ pub mod collector; pub mod directory; pub mod fastfield; pub mod fieldnorm; -pub(crate) mod positions; +pub mod positions; pub mod postings; pub mod query; pub mod schema; diff --git a/src/positions/mod.rs b/src/positions/mod.rs index 2c370a868d..d1ef323d53 100644 --- a/src/positions/mod.rs +++ b/src/positions/mod.rs @@ -1,28 +1,29 @@ -/// Positions are stored in three parts and over two files. -// -/// The `SegmentComponent::Positions` file contains all of the bitpacked positions delta, -/// for all terms of a given field, one term after the other. -/// -/// If the last block is incomplete, it is simply padded with zeros. -/// It cannot be read alone, as it actually does not contain the number of bits used for -/// each blocks. -/// . -/// Each block is serialized one after the other. -/// If the last block is incomplete, it is simply padded with zeros. -/// -/// -/// The `SegmentComponent::PositionsSKIP` file contains the number of bits used in each block in `u8` -/// stream. -/// -/// This makes it possible to rapidly skip over `n positions`. -/// -/// For every block #n where n = k * `LONG_SKIP_INTERVAL` blocks (k>=1), we also store -/// in this file the sum of number of bits used for all of the previous block (blocks `[0, n[`). -/// That is useful to start reading the positions for a given term: The TermInfo contains -/// an address in the positions stream, expressed in "number of positions". -/// The long skip structure makes it possible to skip rapidly to the a checkpoint close to this -/// value, and then skip normally. -/// +//! Tantivy can (if instructed to do so in the schema) store the term positions in a given field. +//! This positions are expressed as token ordinal. For instance, +//! In "The beauty and the beast", the term "the" appears in position 0 and position 4. +//! This information is useful to run phrase queries. +//! +//! The `SegmentComponent::POSITIONS` file contains all of the bitpacked positions delta, +//! for all terms of a given field, one term after the other. +//! +//! Each terms is encoded independently. +//! Like for positing lists, tantivy rely on simd bitpacking to encode the positions delta in blocks of 128 deltas. +//! Because we rarely have a multiple of 128, a final block may encode the remaining values variable byte encoding. +//! +//! In order to make reading possible, the term delta positions first encodes the number of bitpacked blocks, +//! then the bitwidth for each blocks, then the actual bitpacked block and finally the final variable int encoded block. +//! +//! Contrary to postings list, the reader does not have access on the number of positions that is encoded, and instead +//! stops decoding the last block when its byte slice has been entirely read. +//! +//! More formally: +//! * *Positions* := *NumBitPackedBlocks* *BitPackedPositionBlock*^(P/128) *BitPackedPositionsDeltaBitWidth* *VIntPosDeltas*? +//! * *NumBitPackedBlocks**: := *P* / 128 encoded as a variable byte integer. +//! * *BitPackedPositionBlock* := bit width encoded block of 128 positions delta +//! * *BitPackedPositionsDeltaBitWidth* := (*BitWidth*: u8)^*NumBitPackedBlocks* +//! * *VIntPosDeltas* := *VIntPosDelta*^(*P* % 128). +//! +//! The skip widths encoded separately makes it easy and fast to rapidly skip over n positions. mod reader; mod serializer; @@ -38,42 +39,96 @@ pub mod tests { use super::PositionSerializer; use crate::directory::OwnedBytes; use crate::positions::reader::PositionReader; + use proptest::prelude::*; + use proptest::sample::select; use std::iter; - fn create_positions_data(vals: &[u32]) -> OwnedBytes { + fn create_positions_data(vals: &[u32]) -> crate::Result { let mut positions_buffer = vec![]; - { - let mut serializer = PositionSerializer::new(&mut positions_buffer); - for &val in vals { - serializer.write_all(&[val]).unwrap(); + let mut serializer = PositionSerializer::new(&mut positions_buffer); + serializer.write_positions_delta(&vals); + serializer.close_term()?; + serializer.close()?; + Ok(OwnedBytes::new(positions_buffer)) + } + + fn gen_delta_positions() -> BoxedStrategy> { + select(&[0, 1, 70, 127, 128, 129, 200, 255, 256, 257, 270][..]) + .prop_flat_map(|num_delta_positions| { + proptest::collection::vec( + select(&[1u32, 2u32, 4u32, 8u32, 16u32][..]), + num_delta_positions, + ) + }) + .boxed() + } + + proptest! { + #[test] + fn test_position_delta(delta_positions in gen_delta_positions()) { + let delta_positions_data = create_positions_data(&delta_positions).unwrap(); + let mut position_reader = PositionReader::open(delta_positions_data).unwrap(); + let mut minibuf = [0u32; 1]; + for (offset, &delta_position) in delta_positions.iter().enumerate() { + position_reader.read(offset as u64, &mut minibuf[..]); + assert_eq!(delta_position, minibuf[0]); } - serializer.close_term().unwrap(); - serializer.close().unwrap(); } - OwnedBytes::new(positions_buffer) } #[test] - fn test_position_read() { - let v: Vec = (0..1000).collect(); - let positions_data = create_positions_data(&v[..]); + fn test_position_read() -> crate::Result<()> { + let position_deltas: Vec = (0..1000).collect(); + let positions_data = create_positions_data(&position_deltas[..])?; assert_eq!(positions_data.len(), 1224); - let mut position_reader = PositionReader::new(positions_data).unwrap(); + let mut position_reader = PositionReader::open(positions_data)?; for &n in &[1, 10, 127, 128, 130, 312] { let mut v = vec![0u32; n]; position_reader.read(0, &mut v[..]); for i in 0..n { - assert_eq!(v[i], i as u32); + assert_eq!(position_deltas[i], i as u32); } } + Ok(()) } #[test] - fn test_position_read_with_offset() { - let v: Vec = (0..1000).collect(); - let positions_data = create_positions_data(&v[..]); + fn test_empty_position() -> crate::Result<()> { + let mut positions_buffer = vec![]; + let mut serializer = PositionSerializer::new(&mut positions_buffer); + serializer.close_term()?; + serializer.close()?; + let position_delta = OwnedBytes::new(positions_buffer); + assert!(PositionReader::open(position_delta).is_ok()); + Ok(()) + } + + #[test] + fn test_multiple_write_positions() -> crate::Result<()> { + let mut positions_buffer = vec![]; + let mut serializer = PositionSerializer::new(&mut positions_buffer); + serializer.write_positions_delta(&[1u32, 12u32]); + serializer.write_positions_delta(&[4u32, 17u32]); + serializer.write_positions_delta(&[443u32]); + serializer.close_term()?; + serializer.close()?; + let position_delta = OwnedBytes::new(positions_buffer); + let mut output_delta_pos_buffer = vec![0u32; 5]; + let mut position_reader = PositionReader::open(position_delta)?; + position_reader.read(0, &mut output_delta_pos_buffer[..]); + assert_eq!( + &output_delta_pos_buffer[..], + &[1u32, 12u32, 4u32, 17u32, 443u32] + ); + Ok(()) + } + + #[test] + fn test_position_read_with_offset() -> crate::Result<()> { + let position_deltas: Vec = (0..1000).collect(); + let positions_data = create_positions_data(&position_deltas[..])?; assert_eq!(positions_data.len(), 1224); - let mut position_reader = PositionReader::new(positions_data).unwrap(); + let mut position_reader = PositionReader::open(positions_data)?; for &offset in &[1u64, 10u64, 127u64, 128u64, 130u64, 312u64] { for &len in &[1, 10, 130, 500] { let mut v = vec![0u32; len]; @@ -83,15 +138,16 @@ pub mod tests { } } } + Ok(()) } #[test] - fn test_position_read_after_skip() { - let v: Vec = (0..1_000).collect(); - let positions_data = create_positions_data(&v[..]); + fn test_position_read_after_skip() -> crate::Result<()> { + let position_deltas: Vec = (0..1_000).collect(); + let positions_data = create_positions_data(&position_deltas[..])?; assert_eq!(positions_data.len(), 1224); - let mut position_reader = PositionReader::new(positions_data).unwrap(); + let mut position_reader = PositionReader::open(positions_data)?; let mut buf = [0u32; 7]; let mut c = 0; @@ -105,14 +161,15 @@ pub mod tests { c += 1; } } + Ok(()) } #[test] - fn test_position_reread_anchor_different_than_block() { + fn test_position_reread_anchor_different_than_block() -> crate::Result<()> { let positions_delta: Vec = (0..2_000_000).collect(); - let positions_data = create_positions_data(&positions_delta[..]); + let positions_data = create_positions_data(&positions_delta[..])?; assert_eq!(positions_data.len(), 5003499); - let mut position_reader = PositionReader::new(positions_data.clone()).unwrap(); + let mut position_reader = PositionReader::open(positions_data.clone())?; let mut buf = [0u32; 256]; position_reader.read(128, &mut buf); for i in 0..256 { @@ -122,57 +179,40 @@ pub mod tests { for i in 0..256 { assert_eq!(buf[i], (128 + i) as u32); } + Ok(()) } #[test] - #[should_panic(expected = "offset arguments should be increasing.")] - fn test_position_panic_if_called_previous_anchor() { - let positions_delta: Vec = (0..2_000_000).collect(); - let positions_data = create_positions_data(&positions_delta[..]); - assert_eq!(positions_data.len(), 5_003_499); + fn test_position_requesting_passed_block() -> crate::Result<()> { + let positions_delta: Vec = (0..512).collect(); + let positions_data = create_positions_data(&positions_delta[..])?; + assert_eq!(positions_data.len(), 533); let mut buf = [0u32; 1]; - let mut position_reader = PositionReader::new(positions_data).unwrap(); + let mut position_reader = PositionReader::open(positions_data)?; position_reader.read(230, &mut buf); + assert_eq!(buf[0], 230); position_reader.read(9, &mut buf); + assert_eq!(buf[0], 9); + Ok(()) } #[test] - fn test_positions_bug() { - let mut positions_delta: Vec = vec![]; - for i in 1..200 { - for j in 0..i { - positions_delta.push(j); - } - } - let positions_data = create_positions_data(&positions_delta[..]); - let mut buf = Vec::new(); - let mut position_reader = PositionReader::new(positions_data).unwrap(); - let mut offset = 0; - for i in 1..24 { - buf.resize(i, 0); - offset += i as u64; - position_reader.read(offset, &mut buf[..]); - let expected_positions_delta: Vec = (0..i as u32).collect(); - assert_eq!(buf, &expected_positions_delta[..], "Failed for offset={},i={}", offset, i); - } - } - - #[test] - fn test_position() { + fn test_position() -> crate::Result<()> { const CONST_VAL: u32 = 9u32; let positions_delta: Vec = iter::repeat(CONST_VAL).take(2_000_000).collect(); - let positions_data = create_positions_data(&positions_delta[..]); + let positions_data = create_positions_data(&positions_delta[..])?; assert_eq!(positions_data.len(), 1_015_627); - let mut position_reader = PositionReader::new(positions_data).unwrap(); + let mut position_reader = PositionReader::open(positions_data)?; let mut buf = [0u32; 1]; position_reader.read(0, &mut buf); assert_eq!(buf[0], CONST_VAL); + Ok(()) } #[test] - fn test_position_advance() { + fn test_position_advance() -> crate::Result<()> { let positions_delta: Vec = (0..2_000_000).collect(); - let positions_data = create_positions_data(&positions_delta[..]); + let positions_data = create_positions_data(&positions_delta[..])?; assert_eq!(positions_data.len(), 5_003_499); for &offset in &[ 10, @@ -181,10 +221,11 @@ pub mod tests { 128 * 1024 + 7, 128 * 10 * 1024 + 10, ] { - let mut position_reader = PositionReader::new(positions_data.clone()).unwrap(); + let mut position_reader = PositionReader::open(positions_data.clone())?; let mut buf = [0u32; 1]; position_reader.read(offset, &mut buf); assert_eq!(buf[0], offset as u32); } + Ok(()) } } diff --git a/src/positions/reader.rs b/src/positions/reader.rs index 08aee66260..5a046ad1da 100644 --- a/src/positions/reader.rs +++ b/src/positions/reader.rs @@ -31,21 +31,38 @@ pub struct PositionReader { // // As we advance, anchor increases simultaneously with bit_widths and positions get consumed. anchor_offset: u64, + + // These are just copies used for .reset(). + original_bit_widths: OwnedBytes, + original_positions: OwnedBytes, } impl PositionReader { - pub fn new(mut positions_data: OwnedBytes) -> io::Result { + /// Open and reads the term positions encoded into the positions_data owned bytes. + pub fn open(mut positions_data: OwnedBytes) -> io::Result { let num_positions_bitpacked_blocks = VInt::deserialize(&mut positions_data)?.0 as usize; let (bit_widths, positions) = positions_data.split(num_positions_bitpacked_blocks); Ok(PositionReader { - bit_widths, - positions, + bit_widths: bit_widths.clone(), + positions: positions.clone(), block_decoder: BlockDecoder::default(), block_offset: std::i64::MAX as u64, anchor_offset: 0u64, + original_bit_widths: bit_widths, + original_positions: positions, }) } + fn reset(&mut self) { + self.positions = self.original_positions.clone(); + self.bit_widths = self.original_bit_widths.clone(); + self.block_offset = std::i64::MAX as u64; + self.anchor_offset = 0u64; + } + + /// Advance from num_blocks bitpacked blocks. + /// + /// Panics if there are not that many remaining blocks. fn advance_num_blocks(&mut self, num_blocks: usize) { let num_bits: usize = self.bit_widths.as_ref()[..num_blocks] .iter() @@ -63,29 +80,33 @@ impl PositionReader { /// block_rel_id = i means the ith block after the anchor block. fn load_block(&mut self, block_rel_id: usize) { let bit_widths = self.bit_widths.as_slice(); - let byte_offset: usize = bit_widths[0..block_rel_id].iter().map(|&b| b as usize).sum::() * COMPRESSION_BLOCK_SIZE / 8; + let byte_offset: usize = bit_widths[0..block_rel_id] + .iter() + .map(|&b| b as usize) + .sum::() + * COMPRESSION_BLOCK_SIZE + / 8; let compressed_data = &self.positions.as_slice()[byte_offset..]; if bit_widths.len() > block_rel_id { // that block is bitpacked. let bit_width = bit_widths[block_rel_id]; - self.block_decoder.uncompress_block_unsorted(compressed_data, bit_width); + self.block_decoder + .uncompress_block_unsorted(compressed_data, bit_width); } else { // that block is vint encoded. - unimplemented!(); - // self.block_decoder.uncompress_vint_unsorted(compressed_data); + self.block_decoder + .uncompress_vint_unsorted_until_end(compressed_data); } self.block_offset = self.anchor_offset + (block_rel_id * COMPRESSION_BLOCK_SIZE) as u64; - } + } /// Fills a buffer with the positions `[offset..offset+output.len())` integers. /// - /// `offset` is required to have a value >= to the offsets given in previous calls - /// for the given `PositionReaderAbsolute` instance. + /// This function is optimized to be called with increasing values of `offset`. pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) { - assert!( - offset >= self.anchor_offset, - "offset arguments should be increasing." - ); + if offset < self.anchor_offset { + self.reset(); + } let delta_to_block_offset = offset as i64 - self.block_offset as i64; if !(0..128).contains(&delta_to_block_offset) { // The first position is not within the first block. @@ -111,10 +132,13 @@ impl PositionReader { let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE; let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block; if remaining_in_block >= output.len() { - output.copy_from_slice(&self.block_decoder.output_array()[offset_in_block..][..output.len()]); + output.copy_from_slice( + &self.block_decoder.output_array()[offset_in_block..][..output.len()], + ); break; } - output[..remaining_in_block].copy_from_slice(&self.block_decoder.output_array()[offset_in_block..]); + output[..remaining_in_block] + .copy_from_slice(&self.block_decoder.output_array()[offset_in_block..]); output = &mut output[remaining_in_block..]; // we load block #i if necessary. offset += remaining_in_block as u64; diff --git a/src/positions/serializer.rs b/src/positions/serializer.rs index 80a1207720..45a06c5c8e 100644 --- a/src/positions/serializer.rs +++ b/src/positions/serializer.rs @@ -4,6 +4,10 @@ use crate::postings::compression::BlockEncoder; use crate::postings::compression::VIntEncoder; use std::io::{self, Write}; +/// The PositionSerializer is in charge of serializing all of the positions +/// of all of the terms of a given field. +/// +/// It is valid to call write_position_delta more than once per term. pub struct PositionSerializer { block_encoder: BlockEncoder, positions_wrt: CountingWriter, @@ -13,6 +17,7 @@ pub struct PositionSerializer { } impl PositionSerializer { + /// Creates a new PositionSerializer writing into the given positions_wrt. pub fn new(positions_wrt: W) -> PositionSerializer { PositionSerializer { block_encoder: BlockEncoder::new(), @@ -23,7 +28,13 @@ impl PositionSerializer { } } - pub fn offset(&self) -> u64 { + /// Returns the number of bytes written in the positions write object + /// at this point. + /// When called before writing the positions of a term, this value is used as + /// start offset. + /// When called after writing the positions of a term, this value is used as a + /// end offset. + pub fn written_bytes(&self) -> u64 { self.positions_wrt.written_bytes() } @@ -31,39 +42,40 @@ impl PositionSerializer { COMPRESSION_BLOCK_SIZE - self.block.len() } - pub fn write_all(&mut self, mut positions_delta: &[u32]) -> io::Result<()> { + /// Writes all of the given positions delta. + pub fn write_positions_delta(&mut self, mut positions_delta: &[u32]) { while !positions_delta.is_empty() { let remaining_block_len = self.remaining_block_len(); let num_to_write = remaining_block_len.min(positions_delta.len()); self.block.extend(&positions_delta[..num_to_write]); positions_delta = &positions_delta[num_to_write..]; if self.remaining_block_len() == 0 { - self.flush_block()?; + self.flush_block(); } } - Ok(()) } - fn flush_block(&mut self) -> io::Result<()> { + fn flush_block(&mut self) { // encode the positions in the block if self.block.is_empty() { - return Ok(()); - } else if self.block.len() == COMPRESSION_BLOCK_SIZE { + return; + } + if self.block.len() == COMPRESSION_BLOCK_SIZE { let (bit_width, block_encoded): (u8, &[u8]) = self.block_encoder.compress_block_unsorted(&self.block[..]); self.bit_widths.push(bit_width); - self.positions_buffer.write_all(block_encoded)?; + self.positions_buffer.extend(block_encoded); } else { debug_assert!(self.block.len() < COMPRESSION_BLOCK_SIZE); let block_vint_encoded = self.block_encoder.compress_vint_unsorted(&self.block[..]); - self.positions_buffer.write_all(block_vint_encoded)?; + self.positions_buffer.extend_from_slice(block_vint_encoded); } self.block.clear(); - Ok(()) } + /// Close the positions for the given term. pub fn close_term(&mut self) -> io::Result<()> { - self.flush_block()?; + self.flush_block(); VInt(self.bit_widths.len() as u64).serialize(&mut self.positions_wrt)?; self.positions_wrt.write_all(&self.bit_widths[..])?; self.positions_wrt.write_all(&self.positions_buffer)?; @@ -72,6 +84,7 @@ impl PositionSerializer { Ok(()) } + /// Close the positions for this term and flushes the data. pub fn close(mut self) -> io::Result<()> { self.positions_wrt.flush() } diff --git a/src/postings/compression/mod.rs b/src/postings/compression/mod.rs index 5fd5d6a900..e652225036 100644 --- a/src/postings/compression/mod.rs +++ b/src/postings/compression/mod.rs @@ -167,6 +167,8 @@ pub trait VIntDecoder { num_els: usize, padding: u32, ) -> usize; + + fn uncompress_vint_unsorted_until_end(&mut self, compressed_data: &[u8]); } impl VIntEncoder for BlockEncoder { @@ -202,6 +204,11 @@ impl VIntDecoder for BlockDecoder { self.output.0.iter_mut().for_each(|el| *el = padding); vint::uncompress_unsorted(compressed_data, &mut self.output.0[..num_els]) } + + fn uncompress_vint_unsorted_until_end(&mut self, compressed_data: &[u8]) { + let num_els = vint::uncompress_unsorted_until_end(compressed_data, &mut self.output.0); + self.output_len = num_els; + } } #[cfg(test)] diff --git a/src/postings/compression/vint.rs b/src/postings/compression/vint.rs index 94248ff76d..f85784802a 100644 --- a/src/postings/compression/vint.rs +++ b/src/postings/compression/vint.rs @@ -63,13 +63,13 @@ pub fn uncompress_sorted(compressed_data: &[u8], output: &mut [u32], offset: u32 #[inline] pub(crate) fn uncompress_unsorted(compressed_data: &[u8], output_arr: &mut [u32]) -> usize { - let mut read_byte = 0; + let mut num_read_bytes = 0; for output_mut in output_arr.iter_mut() { let mut result = 0u32; let mut shift = 0u32; loop { - let cur_byte = compressed_data[read_byte]; - read_byte += 1; + let cur_byte = compressed_data[num_read_bytes]; + num_read_bytes += 1; result += u32::from(cur_byte % 128u8) << shift; if cur_byte & 128u8 != 0u8 { break; @@ -78,5 +78,31 @@ pub(crate) fn uncompress_unsorted(compressed_data: &[u8], output_arr: &mut [u32] } *output_mut = result; } - read_byte + num_read_bytes +} + +#[inline] +pub(crate) fn uncompress_unsorted_until_end( + compressed_data: &[u8], + output_arr: &mut [u32], +) -> usize { + let mut num_read_bytes = 0; + for (num_ints_written, output_mut) in output_arr.iter_mut().enumerate() { + if compressed_data.len() == num_read_bytes { + return num_ints_written; + } + let mut result = 0u32; + let mut shift = 0u32; + loop { + let cur_byte = compressed_data[num_read_bytes]; + num_read_bytes += 1; + result += u32::from(cur_byte % 128u8) << shift; + if cur_byte & 128u8 != 0u8 { + break; + } + shift += 7; + } + *output_mut = result; + } + output_arr.len() } diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 88b650ccbc..6185532929 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -68,7 +68,7 @@ pub mod tests { field_serializer.new_term("abc".as_bytes(), 12u32)?; for doc_id in 0u32..120u32 { let delta_positions = vec![1, 2, 3, 2]; - field_serializer.write_doc(doc_id, 4, &delta_positions)?; + field_serializer.write_doc(doc_id, 4, &delta_positions); } field_serializer.close_term()?; mem::drop(field_serializer); diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 81f6acc0ea..959ec2bb29 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -313,7 +313,7 @@ impl PostingsWriter for SpecializedPostingsWriter let recorder: Rec = termdict_heap.read(addr); let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32); serializer.new_term(&term_bytes[4..], term_doc_freq)?; - recorder.serialize(&mut buffer_lender, serializer, heap)?; + recorder.serialize(&mut buffer_lender, serializer, heap); serializer.close_term()?; } Ok(()) diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index 322597d98e..8da4322c8d 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -2,7 +2,6 @@ use super::stacker::{ExpUnrolledLinkedList, MemoryArena}; use crate::common::{read_u32_vint, write_u32_vint}; use crate::postings::FieldSerializer; use crate::DocId; -use std::io; const POSITION_END: u32 = 0; @@ -74,7 +73,7 @@ pub(crate) trait Recorder: Copy + 'static { buffer_lender: &mut BufferLender, serializer: &mut FieldSerializer<'_>, heap: &MemoryArena, - ) -> io::Result<()>; + ); /// Returns the number of document containing this term. /// /// Returns `None` if not available. @@ -114,14 +113,13 @@ impl Recorder for NothingRecorder { buffer_lender: &mut BufferLender, serializer: &mut FieldSerializer<'_>, heap: &MemoryArena, - ) -> io::Result<()> { + ) { let buffer = buffer_lender.lend_u8(); self.stack.read_to_end(heap, buffer); // TODO avoid reading twice. for doc in VInt32Reader::new(&buffer[..]) { - serializer.write_doc(doc as u32, 0u32, &[][..])?; + serializer.write_doc(doc as u32, 0u32, &[][..]); } - Ok(()) } fn term_doc_freq(&self) -> Option { @@ -173,16 +171,14 @@ impl Recorder for TermFrequencyRecorder { buffer_lender: &mut BufferLender, serializer: &mut FieldSerializer<'_>, heap: &MemoryArena, - ) -> io::Result<()> { + ) { let buffer = buffer_lender.lend_u8(); self.stack.read_to_end(heap, buffer); let mut u32_it = VInt32Reader::new(&buffer[..]); while let Some(doc) = u32_it.next() { let term_freq = u32_it.next().unwrap_or(self.current_tf); - serializer.write_doc(doc as u32, term_freq, &[][..])?; + serializer.write_doc(doc as u32, term_freq, &[][..]); } - - Ok(()) } fn term_doc_freq(&self) -> Option { @@ -229,7 +225,7 @@ impl Recorder for TfAndPositionRecorder { buffer_lender: &mut BufferLender, serializer: &mut FieldSerializer<'_>, heap: &MemoryArena, - ) -> io::Result<()> { + ) { let (buffer_u8, buffer_positions) = buffer_lender.lend_all(); self.stack.read_to_end(heap, buffer_u8); let mut u32_it = VInt32Reader::new(&buffer_u8[..]); @@ -248,9 +244,8 @@ impl Recorder for TfAndPositionRecorder { } } } - serializer.write_doc(doc, buffer_positions.len() as u32, &buffer_positions)?; + serializer.write_doc(doc, buffer_positions.len() as u32, &buffer_positions); } - Ok(()) } fn term_doc_freq(&self) -> Option { diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index 5a63e7c429..142e994604 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -157,11 +157,11 @@ impl<'a> FieldSerializer<'a> { fn current_term_info(&self) -> TermInfo { let positions_start = if let Some(positions_serializer) = self.positions_serializer_opt.as_ref() { - positions_serializer.offset() + positions_serializer.written_bytes() } else { 0u64 } as usize; - let addr = self.postings_serializer.addr() as usize; + let addr = self.postings_serializer.written_bytes() as usize; TermInfo { doc_freq: 0, postings_range: addr..addr, @@ -198,18 +198,12 @@ impl<'a> FieldSerializer<'a> { /// /// Term frequencies and positions may be ignored by the serializer depending /// on the configuration of the field in the `Schema`. - pub fn write_doc( - &mut self, - doc_id: DocId, - term_freq: u32, - position_deltas: &[u32], - ) -> io::Result<()> { + pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32, position_deltas: &[u32]) { self.current_term_info.doc_freq += 1; self.postings_serializer.write_doc(doc_id, term_freq); if let Some(ref mut positions_serializer) = self.positions_serializer_opt.as_mut() { - positions_serializer.write_all(position_deltas)?; + positions_serializer.write_positions_delta(position_deltas); } - Ok(()) } /// Finish the serialization for this term postings. @@ -220,11 +214,14 @@ impl<'a> FieldSerializer<'a> { if self.term_open { self.postings_serializer .close_term(self.current_term_info.doc_freq)?; + self.current_term_info.postings_range.end = + self.postings_serializer.written_bytes() as usize; + if let Some(positions_serializer) = self.positions_serializer_opt.as_mut() { positions_serializer.close_term()?; + self.current_term_info.positions_range.end = + positions_serializer.written_bytes() as usize; } - self.current_term_info.postings_range.end = self.postings_serializer.addr() as usize; - self.current_term_info.positions_range.end = self.postings_serializer.addr() as usize; self.term_dictionary_builder .insert_value(&self.current_term_info)?; self.term_open = false; @@ -455,7 +452,13 @@ impl PostingsSerializer { Ok(()) } - fn addr(&self) -> u64 { + /// Returns the number of bytes written in the postings write object + /// at this point. + /// When called before writing the postings of a term, this value is used as + /// start offset. + /// When called after writing the postings of a term, this value is used as a + /// end offset. + fn written_bytes(&self) -> u64 { self.output_write.written_bytes() as u64 } diff --git a/src/termdict/fst_termdict/term_info_store.rs b/src/termdict/fst_termdict/term_info_store.rs index 8e85a66414..905c56a960 100644 --- a/src/termdict/fst_termdict/term_info_store.rs +++ b/src/termdict/fst_termdict/term_info_store.rs @@ -347,7 +347,7 @@ mod tests { let term_info = TermInfo { doc_freq: i as u32, postings_range: offset(i)..offset(i + 1), - positions_range: offset(i) * 3..offset(i + 1) * 3 + positions_range: offset(i) * 3..offset(i + 1) * 3, }; store_writer.write_term_info(&term_info)?; term_infos.push(term_info); diff --git a/src/termdict/tests.rs b/src/termdict/tests.rs index 54b3e6a90c..99aa90944d 100644 --- a/src/termdict/tests.rs +++ b/src/termdict/tests.rs @@ -13,7 +13,7 @@ fn make_term_info(term_ord: u64) -> TermInfo { TermInfo { doc_freq: term_ord as u32, postings_range: offset(term_ord)..offset(term_ord + 1), - positions_range: offset(term_ord) * 2 ..offset(term_ord + 1) * 2, + positions_range: offset(term_ord) * 2..offset(term_ord + 1) * 2, } }