From eec4adeecb2b8742dc4c8f89c7ea29dd2c109407 Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Mon, 4 Dec 2023 22:02:17 +1100 Subject: [PATCH 1/3] Refactor Integer RLE V2 handling --- src/reader/decode.rs | 6 +- src/reader/decode/rle_v2.rs | 124 +++--- src/reader/decode/rle_v2/delta.rs | 108 +++--- src/reader/decode/rle_v2/direct.rs | 16 +- src/reader/decode/rle_v2/patched_base.rs | 26 +- src/reader/decode/rle_v2/short_repeat.rs | 39 +- src/reader/decode/util.rs | 460 ++++++----------------- 7 files changed, 244 insertions(+), 535 deletions(-) diff --git a/src/reader/decode.rs b/src/reader/decode.rs index ac585246..0c50f3fd 100644 --- a/src/reader/decode.rs +++ b/src/reader/decode.rs @@ -28,7 +28,7 @@ impl RleVersion { ) -> Box> + Send> { match self { RleVersion::V1 => Box::new(UnsignedRleReaderV1::new(reader)), - RleVersion::V2 => Box::new(UnsignedRleReaderV2::new(reader, true)), + RleVersion::V2 => Box::new(UnsignedRleReaderV2::new(reader)), } } } @@ -51,7 +51,7 @@ pub fn get_direct_signed_rle_reader( match column.encoding().kind() { crate::proto::column_encoding::Kind::Direct => Ok(Box::new(SignedRleReaderV1::new(reader))), crate::proto::column_encoding::Kind::DirectV2 => { - Ok(Box::new(RleReaderV2::new(reader, true, true))) + Ok(Box::new(RleReaderV2::new(reader, true))) } k => InvalidColumnEncodingSnafu { name: column.name(), @@ -70,7 +70,7 @@ pub fn get_direct_unsigned_rle_reader( Ok(Box::new(UnsignedRleReaderV1::new(reader))) } crate::proto::column_encoding::Kind::DirectV2 => { - Ok(Box::new(UnsignedRleReaderV2::new(reader, true))) + Ok(Box::new(UnsignedRleReaderV2::new(reader))) } k => InvalidColumnEncodingSnafu { name: column.name(), diff --git a/src/reader/decode/rle_v2.rs b/src/reader/decode/rle_v2.rs index 90587b9c..9c40111b 100644 --- a/src/reader/decode/rle_v2.rs +++ b/src/reader/decode/rle_v2.rs @@ -2,86 +2,67 @@ pub mod delta; pub mod direct; pub mod patched_base; pub mod short_repeat; -use std::io::{ErrorKind, Read}; +use std::{collections::VecDeque, io::Read}; -use crate::error::{self, Result}; +use crate::error::Result; + +use super::util::try_read_u8; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -enum EncodingTypeV2 { +enum EncodingType { ShortRepeat, Direct, PatchedBase, Delta, } -#[inline] -fn run_encoding(header: u8) -> EncodingTypeV2 { - match (header & 128 == 128, header & 64 == 64) { - // 11... = 3 - (true, true) => EncodingTypeV2::Delta, - // 10... = 2 - (true, false) => EncodingTypeV2::PatchedBase, - // 01... = 1 - (false, true) => EncodingTypeV2::Direct, - // 00... = 0 - (false, false) => EncodingTypeV2::ShortRepeat, +impl EncodingType { + /// Checking highest two bits for encoding type. + #[inline] + fn from_header(header: u8) -> Self { + match (header & 128 == 128, header & 64 == 64) { + // 11... = 3 + (true, true) => Self::Delta, + // 10... = 2 + (true, false) => Self::PatchedBase, + // 01... = 1 + (false, true) => Self::Direct, + // 00... = 0 + (false, false) => Self::ShortRepeat, + } } } pub struct RleReaderV2 { reader: R, signed: bool, - literals: Vec, - num_literals: usize, - used: usize, - skip_corrupt: bool, + literals: VecDeque, } -const MAX_SCOPE: usize = 512; +const MAX_RUN_LENGTH: usize = 512; impl RleReaderV2 { - pub fn new(reader: R, signed: bool, skip_corrupt: bool) -> Self { + pub fn new(reader: R, signed: bool) -> Self { Self { reader, - num_literals: 0, signed, - literals: vec![0; MAX_SCOPE], - used: 0, - skip_corrupt, + literals: VecDeque::with_capacity(MAX_RUN_LENGTH), } } // returns false if no more bytes - pub fn read_values(&mut self, ignore_eof: bool) -> Result { - let mut byte = [0u8]; - - if let Err(err) = self.reader.read_exact(&mut byte) { - return match err.kind() { - ErrorKind::UnexpectedEof => { - if !ignore_eof { - Ok(false) - } else { - error::UnexpectedSnafu { - msg: "Read past end of RLE integer reader", - } - .fail() - } - } - _ => error::UnexpectedSnafu { - msg: err.to_string(), - } - .fail(), - }; - } - - let header = byte[0]; - let encoding = run_encoding(header); + pub fn read_values(&mut self) -> Result { + let header = match try_read_u8(&mut self.reader)? { + Some(byte) => byte, + None => return Ok(false), + }; + let encoding = EncodingType::from_header(header); match encoding { - EncodingTypeV2::ShortRepeat => self.read_short_repeat_values(header)?, - EncodingTypeV2::Direct => self.read_direct_values(header)?, - EncodingTypeV2::PatchedBase => self.read_patched_base(header)?, - EncodingTypeV2::Delta => self.read_delta_values(header)?, + EncodingType::ShortRepeat => self.read_short_repeat_values(header)?, + EncodingType::Direct => self.read_direct_values(header)?, + EncodingType::PatchedBase => self.read_patched_base(header)?, + EncodingType::Delta => self.read_delta_values(header)?, } Ok(true) @@ -92,11 +73,8 @@ impl Iterator for RleReaderV2 { type Item = Result; fn next(&mut self) -> Option { - if self.used == self.num_literals { - self.used = 0; - self.num_literals = 0; - - match self.read_values(false) { + if self.literals.is_empty() { + match self.read_values() { Ok(more) => { if !more { return None; @@ -107,9 +85,7 @@ impl Iterator for RleReaderV2 { } } } - let result = self.literals[self.used]; - self.used += 1; - + let result = self.literals.pop_front().unwrap(); Some(Ok(result)) } } @@ -117,8 +93,8 @@ impl Iterator for RleReaderV2 { pub struct UnsignedRleReaderV2(RleReaderV2); impl UnsignedRleReaderV2 { - pub fn new(reader: R, skip_corrupt: bool) -> Self { - Self(RleReaderV2::new(reader, false, skip_corrupt)) + pub fn new(reader: R) -> Self { + Self(RleReaderV2::new(reader, false)) } } @@ -143,7 +119,7 @@ mod test { let expected = [1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 1, 1, 1, 1]; let cursor = Cursor::new(data); - let reader = UnsignedRleReaderV2::new(cursor, false); + let reader = UnsignedRleReaderV2::new(cursor); let a = reader.collect::>>().unwrap(); assert_eq!(a, expected); @@ -152,7 +128,7 @@ mod test { let expected = [23713, 43806, 57005, 48879]; let cursor = Cursor::new(data); - let reader = UnsignedRleReaderV2::new(cursor, false); + let reader = UnsignedRleReaderV2::new(cursor); let a = reader.collect::>>().unwrap(); assert_eq!(a, expected); @@ -167,7 +143,7 @@ mod test { ]; let cursor = Cursor::new(data); - let reader = UnsignedRleReaderV2::new(cursor, false); + let reader = UnsignedRleReaderV2::new(cursor); let a = reader.collect::>>().unwrap(); assert_eq!(a, expected); @@ -175,7 +151,7 @@ mod test { let expected = [2u64, 3, 5, 7, 11, 13, 17, 19, 23, 29]; let cursor = Cursor::new(data); - let reader = UnsignedRleReaderV2::new(cursor, false); + let reader = UnsignedRleReaderV2::new(cursor); let a = reader.collect::>>().unwrap(); assert_eq!(a, expected); @@ -183,7 +159,7 @@ mod test { let expected = [2u64, 3, 5, 7, 11, 13, 17, 19, 23, 29]; let cursor = Cursor::new(data); - let reader = UnsignedRleReaderV2::new(cursor, false); + let reader = UnsignedRleReaderV2::new(cursor); let a = reader.collect::>>().unwrap(); assert_eq!(a, expected); @@ -191,7 +167,7 @@ mod test { let expected = [1u64, 1, 1, 1, 1, 1, 1, 1, 1, 1]; let cursor = Cursor::new(data); - let reader = UnsignedRleReaderV2::new(cursor, false); + let reader = UnsignedRleReaderV2::new(cursor); let a = reader.collect::>>().unwrap(); assert_eq!(a, expected); } @@ -202,7 +178,7 @@ mod test { let data: [u8; 3] = [0x0a, 0x27, 0x10]; let cursor = Cursor::new(data); - let reader = UnsignedRleReaderV2::new(cursor, false); + let reader = UnsignedRleReaderV2::new(cursor); let a = reader.collect::>>().unwrap(); assert_eq!(a, vec![10000, 10000, 10000, 10000, 10000]); @@ -214,7 +190,7 @@ mod test { let data: [u8; 10] = [0x5e, 0x03, 0x5c, 0xa1, 0xab, 0x1e, 0xde, 0xad, 0xbe, 0xef]; let cursor = Cursor::new(data); - let reader = UnsignedRleReaderV2::new(cursor, false); + let reader = UnsignedRleReaderV2::new(cursor); let a = reader.collect::>>().unwrap(); assert_eq!(a, vec![23713, 43806, 57005, 48879]); @@ -226,7 +202,7 @@ mod test { let data = [110u8, 3, 0, 185, 66, 1, 86, 60, 1, 189, 90, 1, 125, 222]; let cursor = Cursor::new(data); - let reader = RleReaderV2::new(cursor, true, false); + let reader = RleReaderV2::new(cursor, true); let a = reader.collect::>>().unwrap(); assert_eq!(a, vec![23713, 43806, 57005, 48879]); @@ -241,7 +217,7 @@ mod test { let data: [u8; 8] = [0xc6, 0x09, 0x02, 0x02, 0x22, 0x42, 0x42, 0x46]; let cursor = Cursor::new(data); - let reader = UnsignedRleReaderV2::new(cursor, false); + let reader = UnsignedRleReaderV2::new(cursor); let a = reader.collect::>>().unwrap(); assert_eq!(a, vec![2, 3, 5, 7, 11, 13, 17, 19, 23, 29]); @@ -259,7 +235,7 @@ mod test { ]; let cursor = Cursor::new(data); - let reader = RleReaderV2::new(cursor, false, false); + let reader = RleReaderV2::new(cursor, false); let a = reader .collect::>>() .unwrap() @@ -301,7 +277,7 @@ mod test { ]; let cursor = Cursor::new(data); - let reader = RleReaderV2::new(cursor, false, false); + let reader = RleReaderV2::new(cursor, false); let a = reader.collect::>>().unwrap(); assert_eq!(a.len(), expected.len()); diff --git a/src/reader/decode/rle_v2/delta.rs b/src/reader/decode/rle_v2/delta.rs index 959ae428..1d48bcf8 100644 --- a/src/reader/decode/rle_v2/delta.rs +++ b/src/reader/decode/rle_v2/delta.rs @@ -1,8 +1,6 @@ use std::io::Read; -use snafu::ensure; - -use crate::error::{self, Result}; +use crate::error::Result; use crate::reader::decode::rle_v2::RleReaderV2; use crate::reader::decode::util::{ read_ints, read_u8, read_vslong, read_vulong, rle_v2_direct_bit_width, @@ -10,80 +8,64 @@ use crate::reader::decode::util::{ impl RleReaderV2 { pub fn read_delta_values(&mut self, header: u8) -> Result<()> { - let mut fb = (header >> 1) & 0x1f; - if fb != 0 { - fb = rle_v2_direct_bit_width(fb); - } + let fb = (header >> 1) & 0x1f; + let delta_bit_width = if fb != 0 { + rle_v2_direct_bit_width(fb) + } else { + fb + }; let reader = &mut self.reader; - let signed = self.signed; // 9 bits for length (L) (1 to 512 values) - let second_byte = read_u8(reader)?; - - let mut length = ((header as u64 & 0x01) << 8) as usize | second_byte as usize; + // Run length encoded as [0, 511] + // Adjust to actual value [1, 512] + // Run length includes base value and first delta + let second_byte = read_u8(reader)? as u16; + let length = ((header as u16 & 0x01) << 8) | second_byte as u16; + let mut length = length as usize; + length += 1; // read the first value stored as vint - let first_val = if signed { + let base_value = if self.signed { read_vslong(reader)? } else { read_vulong(reader)? as i64 }; + self.literals.push_back(base_value); - self.literals[self.num_literals] = first_val; - self.num_literals += 1; - - // if fixed bits is 0 then all values have fixed delta - if fb == 0 { - // read the fixed delta value stored as vint (deltas can be negative even - // if all number are positive) - let fd = read_vslong(reader)?; + // always signed since can be decreasing sequence + let delta_base = read_vslong(reader)?; - if fd == 0 { - ensure!( - self.num_literals == 1, - error::UnexpectedSnafu { - msg: "expected numLiterals to equal 1" - } - ); - for i in self.num_literals..self.num_literals + length { - self.literals[i] = self.literals[0]; - } - self.num_literals += length; - } else { - // add fixed deltas to adjacent values - for _ in 0..length { - self.literals[self.num_literals] = self.literals[self.num_literals - 1] + fd; - self.num_literals += 1; - } - } + // if width is 0 then all values have fixed delta of delta_base + if delta_bit_width == 0 { + // skip first value since that's base_value + (1..length).fold(base_value, |acc, _| { + let acc = acc + delta_base; + self.literals.push_back(acc); + acc + }); } else { - let delta_base = read_vslong(reader)?; // add delta base and first value - self.literals[self.num_literals] = first_val + delta_base; - self.num_literals += 1; - let mut prev_val = self.literals[self.num_literals - 1]; - length -= 1; + let second_value = base_value + delta_base; + self.literals.push_back(second_value); + length -= 2; // base_value and first delta vaue - // write the unpacked values, add it to previous value and store final - // value to result buffer. if the delta base value is negative then it - // is a decreasing sequence else an increasing sequence - read_ints( - &mut self.literals, - self.num_literals, - length, - fb as usize, - reader, - )?; - while length > 0 { - if delta_base < 0 { - self.literals[self.num_literals] = prev_val - self.literals[self.num_literals]; - } else { - self.literals[self.num_literals] += prev_val - } - prev_val = self.literals[self.num_literals]; - length -= 1; - self.num_literals += 1; - } + // unpack the delta values + read_ints(&mut self.literals, length, delta_bit_width as usize, reader)?; + self.literals + .iter_mut() + // ignore base_value and first delta + .skip(2) + // each element is the delta, so find actual value using running accumulator + .fold(second_value, |acc, delta| { + let acc = if delta_base < 0 { + acc.saturating_sub(*delta) + } else { + acc.saturating_add(*delta) + }; + *delta = acc; + acc + }); } Ok(()) } diff --git a/src/reader/decode/rle_v2/direct.rs b/src/reader/decode/rle_v2/direct.rs index cde2fccd..96a9b796 100644 --- a/src/reader/decode/rle_v2/direct.rs +++ b/src/reader/decode/rle_v2/direct.rs @@ -17,22 +17,12 @@ impl RleReaderV2 { length += 1; // write the unpacked values and zigzag decode to result buffer - read_ints( - &mut self.literals, - self.num_literals, - length, - fb as usize, - &mut self.reader, - )?; + read_ints(&mut self.literals, length, fb as usize, &mut self.reader)?; if self.signed { - for _ in 0..length { - self.literals[self.num_literals] = - zigzag_decode((self.literals[self.num_literals]) as u64); - self.num_literals += 1; + for lit in self.literals.iter_mut() { + *lit = zigzag_decode(*lit as u64); } - } else { - self.num_literals += length; } Ok(()) diff --git a/src/reader/decode/rle_v2/patched_base.rs b/src/reader/decode/rle_v2/patched_base.rs index 36fd14bc..c76f463a 100644 --- a/src/reader/decode/rle_v2/patched_base.rs +++ b/src/reader/decode/rle_v2/patched_base.rs @@ -1,7 +1,8 @@ +use std::collections::VecDeque; use std::io::Read; use super::RleReaderV2; -use crate::error::Result; +use crate::error::{OutOfSpecSnafu, Result}; use crate::reader::decode::util::{ bytes_to_long_be, get_closest_fixed_bits, header_to_rle_v2_direct_bit_width, read_ints, read_u8, rle_v2_direct_bit_width, @@ -47,23 +48,22 @@ impl RleReaderV2 { base = -base } - let mut unpacked = vec![0i64; length]; - - read_ints(&mut unpacked, 0, length, bit_width as usize, reader)?; - - let mut unpacked_patch = vec![0i64; patch_list_length as usize]; + let mut unpacked = VecDeque::with_capacity(length); + read_ints(&mut unpacked, length, bit_width as usize, reader)?; let width = patch_width as usize + patch_gap_width as usize; - - if width > 64 && !self.skip_corrupt { - // TODO: throw error + if width > 64 { + return OutOfSpecSnafu { + msg: "combined patch width and patch gap width cannot be greater than 64 bits", + } + .fail(); } let bit_size = get_closest_fixed_bits(width); + let mut unpacked_patch = VecDeque::with_capacity(patch_list_length as usize); read_ints( &mut unpacked_patch, - 0, patch_list_length as usize, bit_size, reader, @@ -87,8 +87,7 @@ impl RleReaderV2 { if i == actual_gap as usize { let patched_value = item | (current_patch << bit_width); - self.literals[self.num_literals] = base + patched_value; - self.num_literals += 1; + self.literals.push_back(base + patched_value); patch_index += 1; @@ -108,8 +107,7 @@ impl RleReaderV2 { actual_gap += i as i64; } } else { - self.literals[self.num_literals] = base + item; - self.num_literals += 1; + self.literals.push_back(base + item); } } diff --git a/src/reader/decode/rle_v2/short_repeat.rs b/src/reader/decode/rle_v2/short_repeat.rs index 1e4c48e8..3828d6ad 100644 --- a/src/reader/decode/rle_v2/short_repeat.rs +++ b/src/reader/decode/rle_v2/short_repeat.rs @@ -1,23 +1,32 @@ use std::io::Read; -use snafu::ensure; - -use crate::error::{self, Result}; +use crate::error::Result; use crate::reader::decode::rle_v2::RleReaderV2; use crate::reader::decode::util::{bytes_to_long_be, zigzag_decode}; -// MIN_REPEAT_SIZE is the minimum number of repeated values required to use run length encoding. +/// Minimum number of repeated values required to use run length encoding. const MIN_REPEAT_SIZE: usize = 3; impl RleReaderV2 { pub fn read_short_repeat_values(&mut self, header: u8) -> Result<()> { - let size = ((header as u64) >> 3) & 0x07; - let size = size + 1; + // Header byte: + // + // eeww_wccc + // 7 0 LSB + // + // ee = Sub-encoding bits, always 00 + // www = Value width bits + // ccc = Repeat count bits + + let value_byte_width = (header >> 3) & 0x07; // Encoded as 0 to 7 + let value_byte_width = value_byte_width as usize + 1; // Decode to 1 to 8 bytes - let mut l = (header & 0x07) as usize; - l += MIN_REPEAT_SIZE; + let run_length = (header & 0x07) as usize; + let run_length = run_length + MIN_REPEAT_SIZE; - let val = bytes_to_long_be(&mut self.reader, size as usize)?; + // Value that is being repeated is encoded as N bytes in big endian format + // Where N = value_byte_width + let val = bytes_to_long_be(&mut self.reader, value_byte_width)?; let val = if self.signed { zigzag_decode(val as u64) @@ -25,17 +34,9 @@ impl RleReaderV2 { val }; - ensure!( - self.num_literals == 0, - error::UnexpectedSnafu { - msg: "readValues called with existing values present" - } - ); - - for i in 0..l { - self.literals[i] = val; + for _ in 0..run_length { + self.literals.push_back(val); } - self.num_literals = l; Ok(()) } diff --git a/src/reader/decode/util.rs b/src/reader/decode/util.rs index d82aadaf..0aeec43d 100644 --- a/src/reader/decode/util.rs +++ b/src/reader/decode/util.rs @@ -1,8 +1,8 @@ -use std::io::Read; +use std::{collections::VecDeque, io::Read}; use snafu::{OptionExt, ResultExt}; -use crate::error::{self, Result, VarintTooLargeSnafu}; +use crate::error::{self, IoSnafu, Result, VarintTooLargeSnafu}; /// Read single byte #[inline] @@ -24,16 +24,14 @@ pub fn try_read_u8(reader: &mut impl Read) -> Result> { } } -pub fn bytes_to_long_be(r: &mut R, mut n: usize) -> Result { - let mut out: i64 = 0; - - while n > 0 { - n -= 1; - let val = read_u8(r)? as i64; - out |= val << (n * 8) as u64; - } - - Ok(out) +#[inline] +pub fn bytes_to_long_be(r: &mut R, byte_width: usize) -> Result { + let mut buffer = [0; 8]; + // read into back part of buffer since is big endian + // so if smaller than 8 bytes, most significant bytes will be 0 + r.read_exact(&mut buffer[8 - byte_width..]) + .context(IoSnafu)?; + Ok(i64::from_be_bytes(buffer)) } pub fn get_closest_fixed_bits(width: usize) -> usize { @@ -52,29 +50,27 @@ pub fn get_closest_fixed_bits(width: usize) -> usize { } pub fn read_ints( - buffer: &mut [i64], - offset: usize, - len: usize, + queue: &mut VecDeque, + expected_no_of_ints: usize, bit_size: usize, r: &mut impl Read, ) -> Result<()> { - let mut bits_left = 0; - let mut current = 0; - match bit_size { - 1 => unrolled_unpack_1(buffer, offset, len, r), - 2 => unrolled_unpack_2(buffer, offset, len, r), - 4 => unrolled_unpack_4(buffer, offset, len, r), - 8 => unrolled_unpack_8(buffer, offset, len, r), - 16 => unrolled_unpack_16(buffer, offset, len, r), - 24 => unrolled_unpack_24(buffer, offset, len, r), - 32 => unrolled_unpack_32(buffer, offset, len, r), - 40 => unrolled_unpack_40(buffer, offset, len, r), - 48 => unrolled_unpack_48(buffer, offset, len, r), - 56 => unrolled_unpack_56(buffer, offset, len, r), - 64 => unrolled_unpack_64(buffer, offset, len, r), + 1 => unrolled_unpack_1(queue, expected_no_of_ints, r), + 2 => unrolled_unpack_2(queue, expected_no_of_ints, r), + 4 => unrolled_unpack_4(queue, expected_no_of_ints, r), + 8 => unrolled_unpack_8(queue, expected_no_of_ints, r), + 16 => unrolled_unpack_16(queue, expected_no_of_ints, r), + 24 => unrolled_unpack_24(queue, expected_no_of_ints, r), + 32 => unrolled_unpack_32(queue, expected_no_of_ints, r), + 40 => unrolled_unpack_40(queue, expected_no_of_ints, r), + 48 => unrolled_unpack_48(queue, expected_no_of_ints, r), + 56 => unrolled_unpack_56(queue, expected_no_of_ints, r), + 64 => unrolled_unpack_64(queue, expected_no_of_ints, r), _ => { - for item in buffer.iter_mut().skip(offset).take(len) { + let mut bits_left = 0; + let mut current = 0; + for _ in 0..expected_no_of_ints { let mut result: i64 = 0; let mut bits_left_to_read = bit_size; @@ -94,7 +90,7 @@ pub fn read_ints( result |= ((current >> bits_left) & ((1 << bits_left_to_read) - 1)) as i64; } - *item = result; + queue.push_back(result); } Ok(()) @@ -102,35 +98,31 @@ pub fn read_ints( } } +/// Decode numbers with bit width of 1 from read stream fn unrolled_unpack_1( - buffer: &mut [i64], - offset: usize, - len: usize, + buffer: &mut VecDeque, + expected_num_of_ints: usize, reader: &mut impl Read, ) -> Result<()> { - let num_hops = 8; - let remainder = len % num_hops; - let end_offset = offset + len; - let end_unroll = end_offset - remainder; - - for i in (offset..end_unroll).step_by(num_hops) { - let byte = read_u8(reader)?; - let val = byte as u64; - buffer[i] = ((val >> 7) & 1) as i64; - buffer[i + 1] = ((val >> 6) & 1) as i64; - buffer[i + 2] = ((val >> 5) & 1) as i64; - buffer[i + 3] = ((val >> 4) & 1) as i64; - buffer[i + 4] = ((val >> 3) & 1) as i64; - buffer[i + 5] = ((val >> 2) & 1) as i64; - buffer[i + 6] = ((val >> 1) & 1) as i64; - buffer[i + 7] = (val & 1) as i64; + for _ in 0..(expected_num_of_ints / 8) { + let byte = read_u8(reader)? as i64; + buffer.push_back((byte >> 7) & 1); + buffer.push_back((byte >> 6) & 1); + buffer.push_back((byte >> 5) & 1); + buffer.push_back((byte >> 4) & 1); + buffer.push_back((byte >> 3) & 1); + buffer.push_back((byte >> 2) & 1); + buffer.push_back((byte >> 1) & 1); + buffer.push_back(byte & 1); } + // less than full byte at end, extract these trailing numbers + let remainder = expected_num_of_ints % 8; if remainder > 0 { let mut start_shift = 7; - let val = read_u8(reader)? as u64; - for item in buffer.iter_mut().take(end_offset).skip(end_unroll) { - *item = ((val >> start_shift) & 1) as i64; + let byte = read_u8(reader)? as i64; + for _ in 0..remainder { + buffer.push_back((byte >> start_shift) & 1); start_shift -= 1; } } @@ -138,364 +130,134 @@ fn unrolled_unpack_1( Ok(()) } +/// Decode numbers with bit width of 2 from read stream fn unrolled_unpack_2( - buffer: &mut [i64], - offset: usize, - len: usize, + buffer: &mut VecDeque, + expected_num_of_ints: usize, reader: &mut impl Read, ) -> Result<()> { - let num_hops = 4; - let remainder = len % num_hops; - let end_offset = offset + len; - let end_unroll = end_offset - remainder; - - for i in (offset..end_unroll).step_by(num_hops) { - let val = read_u8(reader)? as u64; - buffer[i] = ((val >> 6) & 3) as i64; - buffer[i + 1] = ((val >> 4) & 3) as i64; - buffer[i + 2] = (val >> 2 & 3) as i64; - buffer[i + 3] = ((val) & 3) as i64; + for _ in 0..(expected_num_of_ints / 4) { + let byte = read_u8(reader)? as i64; + buffer.push_back((byte >> 6) & 3); + buffer.push_back((byte >> 4) & 3); + buffer.push_back((byte >> 2) & 3); + buffer.push_back(byte & 3); } + // less than full byte at end, extract these trailing numbers + let remainder = expected_num_of_ints % 4; if remainder > 0 { let mut start_shift = 6; - let val = read_u8(reader)? as u64; - for item in buffer.iter_mut().take(end_offset).skip(end_unroll) { - *item = ((val >> start_shift) & 3) as i64; + let byte = read_u8(reader)? as i64; + for _ in 0..remainder { + buffer.push_back((byte >> start_shift) & 3); start_shift -= 2; } } + Ok(()) } +/// Decode numbers with bit width of 4 from read stream fn unrolled_unpack_4( - buffer: &mut [i64], - offset: usize, - len: usize, + buffer: &mut VecDeque, + expected_num_of_ints: usize, reader: &mut impl Read, ) -> Result<()> { - let num_hops = 2; - let remainder = len % num_hops; - let end_offset = offset + len; - let end_unroll = end_offset - remainder; - - for i in (offset..end_unroll).step_by(num_hops) { - let val = read_u8(reader)? as u64; - buffer[i] = ((val >> 4) & 15) as i64; - buffer[i + 1] = (val & 15) as i64; + for _ in 0..(expected_num_of_ints / 2) { + let byte = read_u8(reader)? as i64; + buffer.push_back((byte >> 4) & 15); + buffer.push_back(byte & 15); } + // at worst have 1 trailing 4-bit number + let remainder = expected_num_of_ints % 2; if remainder > 0 { - let mut start_shift = 4; - let val = read_u8(reader)? as u64; - for item in buffer.iter_mut().take(end_offset).skip(end_unroll) { - *item = ((val >> start_shift) & 15) as i64; - start_shift -= 4; - } + let byte = read_u8(reader)? as i64; + buffer.push_back((byte >> 4) & 15); } + Ok(()) } fn unrolled_unpack_8( - buffer: &mut [i64], - offset: usize, - len: usize, - r: &mut dyn Read, + buffer: &mut VecDeque, + expected_num_of_ints: usize, + r: &mut impl Read, ) -> Result<()> { - unrolled_unpack_bytes(buffer, offset, len, r, 1) + unrolled_unpack_bytes(buffer, expected_num_of_ints, r, 1) } fn unrolled_unpack_16( - buffer: &mut [i64], - offset: usize, - len: usize, - r: &mut dyn Read, + buffer: &mut VecDeque, + expected_num_of_ints: usize, + r: &mut impl Read, ) -> Result<()> { - unrolled_unpack_bytes(buffer, offset, len, r, 2) + unrolled_unpack_bytes(buffer, expected_num_of_ints, r, 2) } fn unrolled_unpack_24( - buffer: &mut [i64], - offset: usize, - len: usize, - r: &mut dyn Read, + buffer: &mut VecDeque, + expected_num_of_ints: usize, + r: &mut impl Read, ) -> Result<()> { - unrolled_unpack_bytes(buffer, offset, len, r, 3) + unrolled_unpack_bytes(buffer, expected_num_of_ints, r, 3) } fn unrolled_unpack_32( - buffer: &mut [i64], - offset: usize, - len: usize, - r: &mut dyn Read, + buffer: &mut VecDeque, + expected_num_of_ints: usize, + r: &mut impl Read, ) -> Result<()> { - unrolled_unpack_bytes(buffer, offset, len, r, 4) + unrolled_unpack_bytes(buffer, expected_num_of_ints, r, 4) } fn unrolled_unpack_40( - buffer: &mut [i64], - offset: usize, - len: usize, - r: &mut dyn Read, + buffer: &mut VecDeque, + expected_num_of_ints: usize, + r: &mut impl Read, ) -> Result<()> { - unrolled_unpack_bytes(buffer, offset, len, r, 5) + unrolled_unpack_bytes(buffer, expected_num_of_ints, r, 5) } fn unrolled_unpack_48( - buffer: &mut [i64], - offset: usize, - len: usize, - r: &mut dyn Read, + buffer: &mut VecDeque, + expected_num_of_ints: usize, + r: &mut impl Read, ) -> Result<()> { - unrolled_unpack_bytes(buffer, offset, len, r, 6) + unrolled_unpack_bytes(buffer, expected_num_of_ints, r, 6) } fn unrolled_unpack_56( - buffer: &mut [i64], - offset: usize, - len: usize, - r: &mut dyn Read, + buffer: &mut VecDeque, + expected_num_of_ints: usize, + r: &mut impl Read, ) -> Result<()> { - unrolled_unpack_bytes(buffer, offset, len, r, 7) + unrolled_unpack_bytes(buffer, expected_num_of_ints, r, 7) } fn unrolled_unpack_64( - buffer: &mut [i64], - offset: usize, - len: usize, - r: &mut dyn Read, + buffer: &mut VecDeque, + expected_num_of_ints: usize, + r: &mut impl Read, ) -> Result<()> { - unrolled_unpack_bytes(buffer, offset, len, r, 8) + unrolled_unpack_bytes(buffer, expected_num_of_ints, r, 8) } +#[inline] fn unrolled_unpack_bytes( - buffer: &mut [i64], - offset: usize, - len: usize, - r: &mut dyn Read, - num_bytes: usize, -) -> Result<()> { - let num_hops = 8; - let remainder = len % num_hops; - let end_offset = offset + len; - let end_unroll = end_offset - remainder; - let mut i = offset; - while i < end_unroll { - read_long_be(r, buffer, i, num_hops, num_bytes)?; - i += num_hops; - } - if remainder > 0 { - read_remaining_longs(buffer, i, r, remainder, num_bytes)?; - } - Ok(()) -} - -fn read_remaining_longs( - buffer: &mut [i64], - mut offset: usize, - r: &mut dyn Read, - mut remainder: usize, - num_bytes: usize, -) -> Result<()> { - let to_read = remainder * num_bytes; - let mut read_buffer = vec![0u8; to_read]; - r.read_exact(&mut read_buffer).context(error::IoSnafu)?; - - let mut idx = 0; - match num_bytes { - 1 => { - while remainder > 0 { - buffer[offset] = i64::from(read_buffer[idx]); - offset += 1; - remainder -= 1; - idx += 1; - } - } - 2 => { - while remainder > 0 { - buffer[offset] = read_long_be2(&read_buffer, idx * 2); - offset += 1; - remainder -= 1; - idx += 1; - } - } - 3 => { - while remainder > 0 { - buffer[offset] = read_long_be3(&read_buffer, idx * 3); - offset += 1; - remainder -= 1; - idx += 1; - } - } - 4 => { - while remainder > 0 { - buffer[offset] = read_long_be4(&read_buffer, idx * 4); - offset += 1; - remainder -= 1; - idx += 1; - } - } - 5 => { - while remainder > 0 { - buffer[offset] = read_long_be5(&read_buffer, idx * 5); - offset += 1; - remainder -= 1; - idx += 1; - } - } - 6 => { - while remainder > 0 { - buffer[offset] = read_long_be6(&read_buffer, idx * 6); - offset += 1; - remainder -= 1; - idx += 1; - } - } - 7 => { - while remainder > 0 { - buffer[offset] = read_long_be7(&read_buffer, idx * 7); - offset += 1; - remainder -= 1; - idx += 1; - } - } - 8 => { - while remainder > 0 { - buffer[offset] = read_long_be8(&read_buffer, idx * 8); - offset += 1; - remainder -= 1; - idx += 1; - } - } - _ => { - return error::InvalidInputSnafu { - msg: "Invalid number of bytes", - } - .fail(); - } - } - - Ok(()) -} - -fn read_long_be( - r: &mut dyn Read, - buffer: &mut [i64], - start: usize, - num_hops: usize, + buffer: &mut VecDeque, + expected_num_of_ints: usize, + r: &mut impl Read, num_bytes: usize, ) -> Result<()> { - let to_read = num_hops * num_bytes; - let mut read_buffer = vec![0u8; to_read]; - r.read_exact(&mut read_buffer).context(error::IoSnafu)?; - - match num_bytes { - 1 => { - for i in 0..8 { - buffer[start + i] = i64::from(read_buffer[i]); - } - } - 2 => { - for i in 0..8 { - buffer[start + i] = read_long_be2(&read_buffer, i * 2); - } - } - 3 => { - for i in 0..8 { - buffer[start + i] = read_long_be3(&read_buffer, i * 3); - } - } - 4 => { - for i in 0..8 { - buffer[start + i] = read_long_be4(&read_buffer, i * 4); - } - } - 5 => { - for i in 0..8 { - buffer[start + i] = read_long_be5(&read_buffer, i * 5); - } - } - 6 => { - for i in 0..8 { - buffer[start + i] = read_long_be6(&read_buffer, i * 6); - } - } - 7 => { - for i in 0..8 { - buffer[start + i] = read_long_be7(&read_buffer, i * 7); - } - } - 8 => { - for i in 0..8 { - buffer[start + i] = read_long_be8(&read_buffer, i * 8); - } - } - _ => { - return error::InvalidInputSnafu { - msg: "Invalid number of bytes", - } - .fail(); - } + for _ in 0..expected_num_of_ints { + let num = bytes_to_long_be(r, num_bytes)?; + buffer.push_back(num); } - Ok(()) } -fn read_long_be2(read_buffer: &[u8], rb_offset: usize) -> i64 { - (i64::from(read_buffer[rb_offset]) << 8) + i64::from(read_buffer[rb_offset + 1]) -} - -fn read_long_be3(read_buffer: &[u8], rb_offset: usize) -> i64 { - (i64::from(read_buffer[rb_offset]) << 16) - + (i64::from(read_buffer[rb_offset + 1]) << 8) - + i64::from(read_buffer[rb_offset + 2]) -} - -fn read_long_be4(read_buffer: &[u8], rb_offset: usize) -> i64 { - (i64::from(read_buffer[rb_offset]) << 24) - + (i64::from(read_buffer[rb_offset + 1]) << 16) - + (i64::from(read_buffer[rb_offset + 2]) << 8) - + i64::from(read_buffer[rb_offset + 3]) -} - -fn read_long_be5(read_buffer: &[u8], rb_offset: usize) -> i64 { - (i64::from(read_buffer[rb_offset]) << 32) - + (i64::from(read_buffer[rb_offset + 1]) << 24) - + (i64::from(read_buffer[rb_offset + 2]) << 16) - + (i64::from(read_buffer[rb_offset + 3]) << 8) - + i64::from(read_buffer[rb_offset + 4]) -} - -fn read_long_be6(read_buffer: &[u8], rb_offset: usize) -> i64 { - (i64::from(read_buffer[rb_offset]) << 40) - + (i64::from(read_buffer[rb_offset + 1]) << 32) - + (i64::from(read_buffer[rb_offset + 2]) << 24) - + (i64::from(read_buffer[rb_offset + 3]) << 16) - + (i64::from(read_buffer[rb_offset + 4]) << 8) - + i64::from(read_buffer[rb_offset + 5]) -} - -fn read_long_be7(read_buffer: &[u8], rb_offset: usize) -> i64 { - (i64::from(read_buffer[rb_offset]) << 48) - + (i64::from(read_buffer[rb_offset + 1]) << 40) - + (i64::from(read_buffer[rb_offset + 2]) << 32) - + (i64::from(read_buffer[rb_offset + 3]) << 24) - + (i64::from(read_buffer[rb_offset + 4]) << 16) - + (i64::from(read_buffer[rb_offset + 5]) << 8) - + i64::from(read_buffer[rb_offset + 6]) -} - -fn read_long_be8(read_buffer: &[u8], rb_offset: usize) -> i64 { - (i64::from(read_buffer[rb_offset]) << 56) - + (i64::from(read_buffer[rb_offset + 1]) << 48) - + (i64::from(read_buffer[rb_offset + 2]) << 40) - + (i64::from(read_buffer[rb_offset + 3]) << 32) - + (i64::from(read_buffer[rb_offset + 4]) << 24) - + (i64::from(read_buffer[rb_offset + 5]) << 16) - + (i64::from(read_buffer[rb_offset + 6]) << 8) - + i64::from(read_buffer[rb_offset + 7]) -} - pub fn rle_v2_direct_bit_width(value: u8) -> u8 { match value { 0..=23 => value + 1, From b8287a574566f76b09c96c010f882c1735f68ea9 Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Mon, 4 Dec 2023 22:08:45 +1100 Subject: [PATCH 2/3] Fix --- src/reader/decode/rle_v2/delta.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/reader/decode/rle_v2/delta.rs b/src/reader/decode/rle_v2/delta.rs index 1d48bcf8..9e08bc3e 100644 --- a/src/reader/decode/rle_v2/delta.rs +++ b/src/reader/decode/rle_v2/delta.rs @@ -21,7 +21,7 @@ impl RleReaderV2 { // Adjust to actual value [1, 512] // Run length includes base value and first delta let second_byte = read_u8(reader)? as u16; - let length = ((header as u16 & 0x01) << 8) | second_byte as u16; + let length = ((header as u16 & 0x01) << 8) | second_byte; let mut length = length as usize; length += 1; @@ -48,7 +48,7 @@ impl RleReaderV2 { // add delta base and first value let second_value = base_value + delta_base; self.literals.push_back(second_value); - length -= 2; // base_value and first delta vaue + length -= 2; // base_value and first delta value // unpack the delta values read_ints(&mut self.literals, length, delta_bit_width as usize, reader)?; From c55fd277c365a379f172d7d98bd77936afff82a3 Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Mon, 4 Dec 2023 22:10:19 +1100 Subject: [PATCH 3/3] Trigger