Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Integer RLE V2 handling #50

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/reader/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl RleVersion {
) -> Box<dyn Iterator<Item = Result<u64>> + Send> {
match self {
RleVersion::V1 => Box::new(UnsignedRleReaderV1::new(reader)),
RleVersion::V2 => Box::new(UnsignedRleReaderV2::new(reader, true)),
RleVersion::V2 => Box::new(UnsignedRleReaderV2::new(reader)),
}
}
}
Expand All @@ -51,7 +51,7 @@ pub fn get_direct_signed_rle_reader<R: Read + Send + 'static>(
match column.encoding().kind() {
crate::proto::column_encoding::Kind::Direct => Ok(Box::new(SignedRleReaderV1::new(reader))),
crate::proto::column_encoding::Kind::DirectV2 => {
Ok(Box::new(RleReaderV2::new(reader, true, true)))
Ok(Box::new(RleReaderV2::new(reader, true)))
}
k => InvalidColumnEncodingSnafu {
name: column.name(),
Expand All @@ -70,7 +70,7 @@ pub fn get_direct_unsigned_rle_reader<R: Read + Send + 'static>(
Ok(Box::new(UnsignedRleReaderV1::new(reader)))
}
crate::proto::column_encoding::Kind::DirectV2 => {
Ok(Box::new(UnsignedRleReaderV2::new(reader, true)))
Ok(Box::new(UnsignedRleReaderV2::new(reader)))
}
k => InvalidColumnEncodingSnafu {
name: column.name(),
Expand Down
124 changes: 50 additions & 74 deletions src/reader/decode/rle_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,86 +2,67 @@ pub mod delta;
pub mod direct;
pub mod patched_base;
pub mod short_repeat;
use std::io::{ErrorKind, Read};
use std::{collections::VecDeque, io::Read};

use crate::error::{self, Result};
use crate::error::Result;

use super::util::try_read_u8;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum EncodingTypeV2 {
enum EncodingType {
ShortRepeat,
Direct,
PatchedBase,
Delta,
}

#[inline]
fn run_encoding(header: u8) -> EncodingTypeV2 {
match (header & 128 == 128, header & 64 == 64) {
// 11... = 3
(true, true) => EncodingTypeV2::Delta,
// 10... = 2
(true, false) => EncodingTypeV2::PatchedBase,
// 01... = 1
(false, true) => EncodingTypeV2::Direct,
// 00... = 0
(false, false) => EncodingTypeV2::ShortRepeat,
impl EncodingType {
/// Checking highest two bits for encoding type.
#[inline]
fn from_header(header: u8) -> Self {
match (header & 128 == 128, header & 64 == 64) {
// 11... = 3
(true, true) => Self::Delta,
// 10... = 2
(true, false) => Self::PatchedBase,
// 01... = 1
(false, true) => Self::Direct,
// 00... = 0
(false, false) => Self::ShortRepeat,
}
}
}

pub struct RleReaderV2<R> {
reader: R,
signed: bool,
literals: Vec<i64>,
num_literals: usize,
used: usize,
skip_corrupt: bool,
literals: VecDeque<i64>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VecDeque provides same functionality, without us needing to fiddle with the indices/len ourselves

}

const MAX_SCOPE: usize = 512;
const MAX_RUN_LENGTH: usize = 512;

impl<R: Read> RleReaderV2<R> {
pub fn new(reader: R, signed: bool, skip_corrupt: bool) -> Self {
pub fn new(reader: R, signed: bool) -> Self {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should provide functionality for skipping corrupt streams:

  • If this integer stream is corrupt, and since columns are made of multiple streams, chances are the expected output will also be corrupt as stream lengths won't match

Self {
reader,
num_literals: 0,
signed,
literals: vec![0; MAX_SCOPE],
used: 0,
skip_corrupt,
literals: VecDeque::with_capacity(MAX_RUN_LENGTH),
}
}

// returns false if no more bytes
pub fn read_values(&mut self, ignore_eof: bool) -> Result<bool> {
let mut byte = [0u8];

if let Err(err) = self.reader.read_exact(&mut byte) {
return match err.kind() {
ErrorKind::UnexpectedEof => {
if !ignore_eof {
Ok(false)
} else {
error::UnexpectedSnafu {
msg: "Read past end of RLE integer reader",
}
.fail()
}
}
_ => error::UnexpectedSnafu {
msg: err.to_string(),
}
.fail(),
};
}

let header = byte[0];
let encoding = run_encoding(header);
pub fn read_values(&mut self) -> Result<bool> {
let header = match try_read_u8(&mut self.reader)? {
Some(byte) => byte,
None => return Ok(false),
};
Comment on lines +54 to +58
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the ignore_eof logic, as have similar thoughts to the removal of skip corrupt


let encoding = EncodingType::from_header(header);
match encoding {
EncodingTypeV2::ShortRepeat => self.read_short_repeat_values(header)?,
EncodingTypeV2::Direct => self.read_direct_values(header)?,
EncodingTypeV2::PatchedBase => self.read_patched_base(header)?,
EncodingTypeV2::Delta => self.read_delta_values(header)?,
EncodingType::ShortRepeat => self.read_short_repeat_values(header)?,
EncodingType::Direct => self.read_direct_values(header)?,
EncodingType::PatchedBase => self.read_patched_base(header)?,
EncodingType::Delta => self.read_delta_values(header)?,
}

Ok(true)
Expand All @@ -92,11 +73,8 @@ impl<R: Read> Iterator for RleReaderV2<R> {
type Item = Result<i64>;

fn next(&mut self) -> Option<Self::Item> {
if self.used == self.num_literals {
self.used = 0;
self.num_literals = 0;

match self.read_values(false) {
if self.literals.is_empty() {
match self.read_values() {
Ok(more) => {
if !more {
return None;
Expand All @@ -107,18 +85,16 @@ impl<R: Read> Iterator for RleReaderV2<R> {
}
}
}
let result = self.literals[self.used];
self.used += 1;

let result = self.literals.pop_front().unwrap();
Some(Ok(result))
}
}

pub struct UnsignedRleReaderV2<R>(RleReaderV2<R>);

impl<R: Read> UnsignedRleReaderV2<R> {
pub fn new(reader: R, skip_corrupt: bool) -> Self {
Self(RleReaderV2::new(reader, false, skip_corrupt))
pub fn new(reader: R) -> Self {
Self(RleReaderV2::new(reader, false))
}
}

Expand All @@ -143,7 +119,7 @@ mod test {
let expected = [1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 1, 1, 1, 1];

let cursor = Cursor::new(data);
let reader = UnsignedRleReaderV2::new(cursor, false);
let reader = UnsignedRleReaderV2::new(cursor);
let a = reader.collect::<Result<Vec<_>>>().unwrap();
assert_eq!(a, expected);

Expand All @@ -152,7 +128,7 @@ mod test {
let expected = [23713, 43806, 57005, 48879];

let cursor = Cursor::new(data);
let reader = UnsignedRleReaderV2::new(cursor, false);
let reader = UnsignedRleReaderV2::new(cursor);
let a = reader.collect::<Result<Vec<_>>>().unwrap();
assert_eq!(a, expected);

Expand All @@ -167,31 +143,31 @@ mod test {
];

let cursor = Cursor::new(data);
let reader = UnsignedRleReaderV2::new(cursor, false);
let reader = UnsignedRleReaderV2::new(cursor);
let a = reader.collect::<Result<Vec<_>>>().unwrap();
assert_eq!(a, expected);

let data = [196u8, 9, 2, 2, 74, 40, 166];
let expected = [2u64, 3, 5, 7, 11, 13, 17, 19, 23, 29];

let cursor = Cursor::new(data);
let reader = UnsignedRleReaderV2::new(cursor, false);
let reader = UnsignedRleReaderV2::new(cursor);
let a = reader.collect::<Result<Vec<_>>>().unwrap();
assert_eq!(a, expected);

let data = [0xc6u8, 0x09, 0x02, 0x02, 0x22, 0x42, 0x42, 0x46];
let expected = [2u64, 3, 5, 7, 11, 13, 17, 19, 23, 29];

let cursor = Cursor::new(data);
let reader = UnsignedRleReaderV2::new(cursor, false);
let reader = UnsignedRleReaderV2::new(cursor);
let a = reader.collect::<Result<Vec<_>>>().unwrap();
assert_eq!(a, expected);

let data = [7u8, 1];
let expected = [1u64, 1, 1, 1, 1, 1, 1, 1, 1, 1];

let cursor = Cursor::new(data);
let reader = UnsignedRleReaderV2::new(cursor, false);
let reader = UnsignedRleReaderV2::new(cursor);
let a = reader.collect::<Result<Vec<_>>>().unwrap();
assert_eq!(a, expected);
}
Expand All @@ -202,7 +178,7 @@ mod test {
let data: [u8; 3] = [0x0a, 0x27, 0x10];

let cursor = Cursor::new(data);
let reader = UnsignedRleReaderV2::new(cursor, false);
let reader = UnsignedRleReaderV2::new(cursor);
let a = reader.collect::<Result<Vec<_>>>().unwrap();

assert_eq!(a, vec![10000, 10000, 10000, 10000, 10000]);
Expand All @@ -214,7 +190,7 @@ mod test {
let data: [u8; 10] = [0x5e, 0x03, 0x5c, 0xa1, 0xab, 0x1e, 0xde, 0xad, 0xbe, 0xef];

let cursor = Cursor::new(data);
let reader = UnsignedRleReaderV2::new(cursor, false);
let reader = UnsignedRleReaderV2::new(cursor);
let a = reader.collect::<Result<Vec<_>>>().unwrap();

assert_eq!(a, vec![23713, 43806, 57005, 48879]);
Expand All @@ -226,7 +202,7 @@ mod test {
let data = [110u8, 3, 0, 185, 66, 1, 86, 60, 1, 189, 90, 1, 125, 222];

let cursor = Cursor::new(data);
let reader = RleReaderV2::new(cursor, true, false);
let reader = RleReaderV2::new(cursor, true);
let a = reader.collect::<Result<Vec<_>>>().unwrap();

assert_eq!(a, vec![23713, 43806, 57005, 48879]);
Expand All @@ -241,7 +217,7 @@ mod test {
let data: [u8; 8] = [0xc6, 0x09, 0x02, 0x02, 0x22, 0x42, 0x42, 0x46];

let cursor = Cursor::new(data);
let reader = UnsignedRleReaderV2::new(cursor, false);
let reader = UnsignedRleReaderV2::new(cursor);
let a = reader.collect::<Result<Vec<_>>>().unwrap();

assert_eq!(a, vec![2, 3, 5, 7, 11, 13, 17, 19, 23, 29]);
Expand All @@ -259,7 +235,7 @@ mod test {
];

let cursor = Cursor::new(data);
let reader = RleReaderV2::new(cursor, false, false);
let reader = RleReaderV2::new(cursor, false);
let a = reader
.collect::<Result<Vec<_>>>()
.unwrap()
Expand Down Expand Up @@ -301,7 +277,7 @@ mod test {
];

let cursor = Cursor::new(data);
let reader = RleReaderV2::new(cursor, false, false);
let reader = RleReaderV2::new(cursor, false);
let a = reader.collect::<Result<Vec<_>>>().unwrap();

assert_eq!(a.len(), expected.len());
Expand Down
Loading