Skip to content

Commit

Permalink
Add some changes related to compression.
Browse files Browse the repository at this point in the history
  • Loading branch information
SajadKarim committed Oct 1, 2024
1 parent 1446fc3 commit ca604af
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 19 deletions.
1 change: 1 addition & 0 deletions betree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ env_logger = { version = "0.9", optional = true }
core_affinity = "0.5"
async-trait = "0.1"

lz4-sys = "1.9"
lz4 = "1.23.1"
zstd = { version = "0.9", default-features = false }
zstd-safe = { version = "4.0", default-features = false, features = ["experimental"] }
Expand Down
34 changes: 31 additions & 3 deletions betree/src/compression/lz4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,16 @@ use std::time::Instant;
use speedy::{Readable, Writable};
const DATA_OFF: usize = mem::size_of::<u32>();

use lz4_sys::{LZ4F_compressBound, LZ4FPreferences, LZ4FCompressionContext, LZ4F_createCompressionContext};
use std::ptr;
use lz4_sys::LZ4FFrameInfo;

impl CompressionState for Lz4Compression {
fn finishext2(&mut self, data: &[u8]) -> Result<Buf>
{
panic!("..");
}

fn finishext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
let size = data.len();
Expand All @@ -124,7 +133,16 @@ impl CompressionState for Lz4Compression {

fn finish(&mut self, data: Buf) -> Result<Buf> {
let size = data.as_ref().len();
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32));
// let prefs = LZ4FPreferences {
// frame_info: Default::default(),
// compression_level: self.config.level as u32,
// auto_flush: 1,
// favor_dec_speed: 0,
// reserved: [0; 3],
// };


let mut buf: BufWrite = BufWrite::with_capacity(Block::round_up_from_bytes( (size as u32)));

let mut encoder = EncoderBuilder::new()
.level(u32::from(self.config.level))
Expand All @@ -133,10 +151,18 @@ impl CompressionState for Lz4Compression {
.block_mode(BlockMode::Linked)
.build(buf)?;

//io::copy(&mut data.as_ref(), &mut encoder)?;
encoder.write_all(data.as_ref())?;
let (compressed_data, result) = encoder.finish();

Ok(compressed_data.into_buf())

if let Err(e) = result {
panic!("Compression failed: {:?}", e);
}

let mut buf2 = BufWrite::with_capacity(Block::round_up_from_bytes(compressed_data.as_slice().len() as u32));
buf2.write_all(compressed_data.as_slice());

Ok(buf2.into_buf())
}
// fn finish(&mut self) -> Buf {
// let (v, result) = self.encoder.finish();
Expand All @@ -146,6 +172,8 @@ impl CompressionState for Lz4Compression {
}




impl DecompressionState for Lz4Decompression {
fn decompressext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
Expand Down
1 change: 1 addition & 0 deletions betree/src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub trait CompressionState: Write {
/// compressed data.
fn finish(&mut self, data: Buf) -> Result<Buf>;
fn finishext(&mut self, data: &[u8]) -> Result<Vec<u8>>;
fn finishext2(&mut self, data: &[u8]) -> Result<Buf>;
}

pub trait DecompressionState {
Expand Down
5 changes: 5 additions & 0 deletions betree/src/compression/none.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ impl io::Write for NoneCompression {
}

impl CompressionState for NoneCompression {
fn finishext2(&mut self, data: &[u8]) -> Result<Buf>
{
panic!("..");
}

fn finishext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
Ok(data.clone().to_vec())
Expand Down
60 changes: 48 additions & 12 deletions betree/src/compression/zstd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ use speedy::{Readable, Writable};
const DATA_OFF: usize = mem::size_of::<u32>();

impl CompressionState for ZstdCompression {
fn finishext(&mut self, data: &[u8]) -> Result<Vec<u8>>
fn finishext2(&mut self, data: &[u8]) -> Result<Buf>
{
let size = zstd_safe::compress_bound(data.len());
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32));
Expand All @@ -119,12 +119,12 @@ impl CompressionState for ZstdCompression {
.write_to_buffer(&mut buf.as_mut()[..DATA_OFF])
.unwrap();

Ok(buf.as_slice().to_vec())
Ok(buf.into_buf())
}

fn finish(&mut self, data: Buf) -> Result<Buf> {
let start = Instant::now();
let size = zstd_safe::compress_bound(data.as_ref().len());
fn finishext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
let size = zstd_safe::compress_bound(data.len());
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32));
buf.write_all(&[0u8; DATA_OFF])?;

Expand All @@ -146,10 +146,46 @@ impl CompressionState for ZstdCompression {
og_len
.write_to_buffer(&mut buf.as_mut()[..DATA_OFF])
.unwrap();
let duration = start.elapsed();
//println!("Total time elapsed: {:?}", duration);
//println!("Total time elapsed: {} {}", size, buf.get_len());
Ok(buf.into_buf())

Ok(buf.as_slice().to_vec())
}

fn finish(&mut self, data: Buf) -> Result<Buf> {
//panic!("..");
let start = Instant::now();
let size = zstd_safe::compress_bound(data.as_ref().len());
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32));
//buf.write_all(&[0u8; DATA_OFF])?;

let mut input = zstd::stream::raw::InBuffer::around(&data);
let mut output = zstd::stream::raw::OutBuffer::around_pos(&mut buf, 0);
let mut finished_frame;
loop {
let remaining = self.writer.run(&mut input, &mut output)?;
finished_frame = remaining == 0;
if input.pos() > 0 || data.is_empty() {
break;
}
}
while self.writer.flush(&mut output)? > 0 {}
self.writer.finish(&mut output, finished_frame)?;

// let og_len = data.len() as u32;
// og_len
// .write_to_buffer(&mut buf.as_mut()[..DATA_OFF])
// .unwrap();
// let duration = start.elapsed();
// let b = buf.get_len();
let mut buf2 = BufWrite::with_capacity(Block::round_up_from_bytes(output.as_slice().len() as u32));
buf2.write_all(output.as_slice());

let a = output.as_slice().len();
let b = buf2.into_buf();
let c = buf.into_buf();
//println!("== {:?}", data.as_ref());
//println!("== {:?}", b.as_ref());
//println!("compressed....: {} {} {} {}", size, data.as_ref().len(), b.as_ref().len(), c.as_ref().len());
Ok(b)
}
}

Expand All @@ -158,7 +194,7 @@ impl DecompressionState for ZstdDecompression {
fn decompressext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
//panic!("shukro maula");
let size = u32::read_from_buffer(data).unwrap();
let size = data.len() as u32;
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size));

let mut input = zstd::stream::raw::InBuffer::around(&data[DATA_OFF..]);
Expand Down Expand Up @@ -192,10 +228,10 @@ impl DecompressionState for ZstdDecompression {
//let start = Instant::now();
//panic!("..why");

let size = u32::read_from_buffer(data.as_ref()).unwrap();
let size = data.as_ref().len() as u32;
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size));

let mut input = zstd::stream::raw::InBuffer::around(&data[DATA_OFF..]);
let mut input = zstd::stream::raw::InBuffer::around(&data[..]);
let mut output = zstd::stream::raw::OutBuffer::around(&mut buf);

let mut finished_frame;
Expand Down
6 changes: 3 additions & 3 deletions betree/src/data_management/dmu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ where
//println!("..zz {:?} {}", bytes_to_read, compressed_data.as_ref().len());
let object: Node<ObjRef<ObjectPointer<SPL::Checksum>>> = {
/// TODOooooooooooooooooooooooooooooooooooooooooooooooo fix this!!!!!!! layeeeee
//let data = decompression_state.decompress(compressed_data)?;
//Object::unpack_and_decompress(op.size(), op.checksum().clone().into(), self.pool.clone().into(), op.offset(), op.info(), data.into_boxed_slice(), a)?
Object::unpack_and_decompress(op.size(), op.checksum().clone().into(), self.pool.clone().into(), op.offset(), op.info(), compressed_data.into_boxed_slice(), a)?
let data = decompression_state.decompress(compressed_data)?;
Object::unpack_and_decompress(op.size(), op.checksum().clone().into(), self.pool.clone().into(), op.offset(), op.info(), data.into_boxed_slice(), a)?
//Object::unpack_and_decompress(op.size(), op.checksum().clone().into(), self.pool.clone().into(), op.offset(), op.info(), compressed_data.into_boxed_slice(), a)?
};

let key = ObjectKey::Unmodified { offset, generation };
Expand Down
2 changes: 1 addition & 1 deletion betree/src/tree/imp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ where
dml: X,
storage_preference: StoragePreference,
) -> Self {
let root_node = dml.insert(Node::empty_leaf(true), tree_id, PivotKey::Root(tree_id));
let root_node = dml.insert(Node::empty_leaf(false), tree_id, PivotKey::Root(tree_id));
Tree::new(root_node, tree_id, msg_action, dml, storage_preference)
}

Expand Down

0 comments on commit ca604af

Please sign in to comment.