Skip to content

Commit

Permalink
multithreaded compressor/decompressor
Browse files Browse the repository at this point in the history
  • Loading branch information
garikello3d committed Jan 7, 2024
1 parent 44e863c commit 7579d0e
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 76 deletions.
22 changes: 21 additions & 1 deletion src/arg_opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub enum Commands {
#[arg(long, value_name = "level")]
compress_level: u8,

/// how many threads to use for compression; defaults the number of CPU cores if omitted
#[arg(long, value_name = "how_many")]
compress_threads: Option<usize>,

/// Buffer size for reading stdin data, in MB
#[arg(long, value_name ="size_mb")]
buf_size: usize,
Expand All @@ -56,6 +60,10 @@ pub enum Commands {
#[arg(long, value_name = "password")]
pass: String,

/// how many threads to use for decompression; defaults to the number of CPU cores if omitted
#[arg(long, value_name = "how_many")]
decompress_threads: Option<usize>,

/// Buffer size for reading disk files, in MB
#[arg(long, value_name ="size_mb")]
buf_size: usize,
Expand All @@ -78,11 +86,15 @@ pub enum Commands {
#[arg(long, value_name = "password")]
pass: String,

/// how many threads to use for decompression; defaults to the number of CPU cores if omitted
#[arg(long, value_name = "how_many")]
decompress_threads: Option<usize>,

/// Buffer size for reading disk files, in MB
#[arg(long, value_name ="size_mb")]
buf_size: usize,
},
/// Benchmark mode: read data from stdin and try different combinations of input params to see how fast the process is
/// Benchmark mode: read data from stdin and try different combinations of input params to see how fast the archiving is
Bench {
/// Path to directory to store temporary files
#[arg(long, value_name = "/path/to/dir")]
Expand All @@ -99,5 +111,13 @@ pub enum Commands {
/// Buffer sizes for reading stdin data to try, comma-separated values (in MB)
#[arg(long, value_name ="size,size,size,...", value_delimiter = ',', num_args = 1..)]
buf_sizes: Vec<usize>,

/// Sequence of numbers of threads to use, comma-separated values
#[arg(long, value_name = "n,n,n,...", value_delimiter = ',', num_args = 1..)]
compress_threads_nums: Vec<usize>,
}
}

pub fn nr_threads_from_arg(opt_nr: &Option<usize>) -> Result<usize, String> {
Ok(opt_nr.unwrap_or(std::thread::available_parallelism().map_err(|_| "could not get number of processor cores")?.get()))
}
123 changes: 66 additions & 57 deletions src/bin/bigarchiver/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bigarchiver::arg_opts::{ArgOpts, Commands};
use bigarchiver::arg_opts::{ArgOpts, Commands, nr_threads_from_arg};
use bigarchiver::{backup, check, timestamp};
use bigarchiver::file_set::cfg_from_pattern;
use bigarchiver::finalizable::DataSink;
Expand All @@ -24,50 +24,55 @@ impl DataSink for StdoutWriter {
fn process_args(args: &ArgOpts) -> Result<(), String> {
match &args.command {
Commands::Backup {
out_template, pass, auth, auth_every, split_size, compress_level, buf_size, no_check
out_template, pass, auth, auth_every,
split_size, compress_level, compress_threads, buf_size, no_check
} => {
eprintln!("backing up...");
let nr_threads = nr_threads_from_arg(compress_threads)?;
eprintln!("backing up (using {} threads)...", nr_threads);
let buf_size = *buf_size * 1_048_576;
let split_size = *split_size * 1_048_576;
let auth_every = *auth_every * 1_048_576;
backup(&mut std::io::stdin(),
&auth, auth_every,
split_size, &out_template,
pass, *compress_level, buf_size, None)?;
pass, *compress_level, nr_threads, buf_size, None)?;
if !no_check {
let cfg_path = cfg_from_pattern(&out_template);
eprintln!("verifying...");
check(None::<StdoutWriter>, &cfg_path, pass, buf_size, &None::<&str>, true)
check(None::<StdoutWriter>, &cfg_path, pass, nr_threads, buf_size, &None::<&str>, true)
} else {
Ok(())
}
},

Commands::Restore { config, pass, buf_size, check_free_space, no_check } => {
Commands::Restore { config, pass, decompress_threads, buf_size, check_free_space, no_check } => {
let buf_size = *buf_size * 1_048_576;
let nr_threads = nr_threads_from_arg(decompress_threads)?;
if !no_check {
eprintln!("verifying before restore...");
check(None::<StdoutWriter>, &config, pass, buf_size, &None, true)
eprintln!("verifying before restore (using {} threads)...", nr_threads);
check(None::<StdoutWriter>, &config, pass, nr_threads, buf_size, &None, true)
.map_err(|e| format!("will not restore data, integrity check error: {}", e))?;
}
eprintln!("restoring...");
eprintln!("restoring (using {} threads)...", nr_threads);
let may_be_check = check_free_space.as_ref().map(|s| s.as_str());
check(Some(StdoutWriter{}), &config, pass,
check(Some(StdoutWriter{}), &config, pass, nr_threads,
buf_size, &may_be_check, true)
.map_err(|e| format!("error restoring data: {}", e))
},

Commands::Check { config, pass, buf_size } => {
eprintln!("verifying...");
Commands::Check { config, pass, decompress_threads, buf_size } => {
let nr_threads = nr_threads_from_arg(decompress_threads)?;
eprintln!("verifying (using {} threads)...", nr_threads);
let buf_size = *buf_size * 1_048_576;
check(None::<StdoutWriter>, &config, pass,
check(None::<StdoutWriter>, &config, pass, nr_threads,
buf_size, &None, true)
},

Commands::Bench { out_dir, duration, compress_levels, buf_sizes } => {
Commands::Bench { out_dir, duration, compress_levels, buf_sizes, compress_threads_nums } => {
struct Throughput {
level: u8,
buf_size: usize,
nr_threads: usize,
time_spent_s: u64,
bytes: usize,
bps: usize
Expand All @@ -77,55 +82,59 @@ fn process_args(args: &ArgOpts) -> Result<(), String> {

for compress_level in compress_levels {
for buf_size in buf_sizes {
let exit_flag = Arc::new(AtomicBool::new(false));
let exit_flag_clone = exit_flag.clone();
let level = *compress_level;
let buf_size_bytes = *buf_size * 1_048_576;

let base_dir = format!("{}/{}-{}", out_dir, compress_level, buf_size);
let _ = fs::remove_dir_all(&base_dir); // we don't care if it does not exist
fs::create_dir(&base_dir).map_err(|e| format!("could not create directory {}: {}", &base_dir, e))?;

let out_template = format!("{}/%", &base_dir);
let out_cfg = format!("{}/0.cfg", &base_dir);

let ts_start = timestamp();

let thread: thread::JoinHandle<Result<usize, String>> = thread::spawn(move|| {
let bytes = backup(&mut std::io::stdin(),
"auth", 1_048_576,
usize::MAX, &out_template,
"pass", level, buf_size_bytes, Some(exit_flag_clone))?;

check(None::<StdoutWriter>, &out_cfg, "pass", buf_size_bytes, &None::<&str>, false)?;

Ok(bytes)
});

thread::sleep(std::time::Duration::from_millis(*duration as u64 * 1000));
//eprintln!("waking up");
exit_flag.store(true, std::sync::atomic::Ordering::SeqCst);
let bytes = thread.join().unwrap()?;
let ts_end = timestamp();
let ts_delta = ts_end - ts_start;

thrpts.push(Throughput{
level: *compress_level,
buf_size: *buf_size,
time_spent_s: ts_delta,
bytes: bytes,
bps: if ts_delta > 0 { bytes / ts_delta as usize } else { 0 }
});

fs::remove_dir_all(&base_dir).map_err(|e| format!("could not cleanup base directory {}: {}", &base_dir, e))?;
for nr_threads in compress_threads_nums {
let exit_flag = Arc::new(AtomicBool::new(false));
let exit_flag_clone = exit_flag.clone();
let level = *compress_level;
let buf_size_bytes = *buf_size * 1_048_576;
let threads = *nr_threads;

let base_dir = format!("{}/{}-{}-{}", out_dir, compress_level, buf_size, threads);
let _ = fs::remove_dir_all(&base_dir); // we don't care if it does not exist
fs::create_dir_all(&base_dir).map_err(|e| format!("could not create directory {}: {}", &base_dir, e))?;

let out_template = format!("{}/%", &base_dir);
let out_cfg = format!("{}/0.cfg", &base_dir);

let ts_start = timestamp();

let thread: thread::JoinHandle<Result<usize, String>> = thread::spawn(move|| {
let bytes = backup(&mut std::io::stdin(),
"auth", 1_048_576,
usize::MAX, &out_template,
"pass", level, threads, buf_size_bytes, Some(exit_flag_clone))?;

check(None::<StdoutWriter>, &out_cfg, "pass", threads, buf_size_bytes, &None::<&str>, false)?;

Ok(bytes)
});

thread::sleep(std::time::Duration::from_millis(*duration as u64 * 1000));
//eprintln!("waking up");
exit_flag.store(true, std::sync::atomic::Ordering::SeqCst);
let bytes = thread.join().unwrap()?;
let ts_end = timestamp();
let ts_delta = ts_end - ts_start;

thrpts.push(Throughput{
level: *compress_level,
buf_size: *buf_size,
nr_threads: *nr_threads,
time_spent_s: ts_delta,
bytes: bytes,
bps: if ts_delta > 0 { bytes / ts_delta as usize } else { 0 }
});

fs::remove_dir_all(&base_dir).map_err(|e| format!("could not cleanup base directory {}: {}", &base_dir, e))?;
}
}
}

thrpts.sort_by(|a,b| b.bps.cmp(&a.bps));
println!("statistics gathered:");
thrpts.into_iter().for_each(|t| {
println!("speed = {} b/s\tbytes = {}\tseconds = {}\tlevel = {}\tbuffer = {} MB\t",
t.bps, t.bytes, t.time_spent_s, t.level, t.buf_size);
println!("speed = {} b/s\tbytes = {}\tthreads = {}\tseconds = {}\tlevel = {}\tbuffer = {} MB\t",
t.bps, t.bytes, t.nr_threads, t.time_spent_s, t.level, t.buf_size);
});

Ok(())
Expand Down
36 changes: 25 additions & 11 deletions src/comp_decomp_2.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::io::Write;
use liblzma::write::{XzEncoder, XzDecoder};
use liblzma::stream::MtStreamBuilder;
use crate::finalizable::DataSink;

pub struct Conv<'a, T: DataSink> {
Expand All @@ -26,10 +27,16 @@ pub struct Compressor2<'a, T: DataSink> {
enc: XzEncoder<Conv<'a, T>>
}


impl<'a, T: DataSink> Compressor2<'a, T> {
pub fn new(to: &'a mut T, level: u32) -> Compressor2<'a, T> {
Compressor2 { enc: XzEncoder::new(Conv{ t: to }, level) }
pub fn new(to: &'a mut T, level: u32, nr_threads: u32) -> Result<Compressor2<'a, T>, String> {
if nr_threads > 1 {
let mut bld = MtStreamBuilder::new();
bld.preset(level).threads(nr_threads);
let stream = bld.encoder().map_err(|e| format!("could not create multi-threaded LZMA encoder: {}", e))?;
Ok( Compressor2 { enc: XzEncoder::new_stream(Conv{ t: to }, stream) } )
} else {
Ok( Compressor2 { enc: XzEncoder::new(Conv{ t: to }, level) } )
}
}

#[allow(dead_code)]
Expand All @@ -45,7 +52,6 @@ impl<'a, T: DataSink> Compressor2<'a, T> {

impl<'a, T: DataSink> DataSink for Compressor2<'a, T> {
fn add(&mut self, data: &[u8]) -> Result<(), String> {
//eprintln!("Compressor: writing {} bytes", data.len());
self.enc.write_all(data).map_err(|e| format!("write all error: {}", e))
}
fn finish(&mut self) -> Result<(), String> {
Expand All @@ -61,8 +67,16 @@ pub struct Decompressor2<'a, T: DataSink> {
}

impl<'a, T: DataSink> Decompressor2<'a, T> {
pub fn new(to: &'a mut T) -> Decompressor2<'a, T> {
Decompressor2 { dec: XzDecoder::new(Conv{ t: to }) }
pub fn new(to: &'a mut T, nr_threads: u32) -> Result<Decompressor2<'a, T>, String> {
if nr_threads > 1 {
let mut bld = MtStreamBuilder::new();
bld.threads(nr_threads);
let mut stream = bld.decoder().map_err(|e| format!("could not create multi-threaded LZMA decoder: {}", e))?;
stream.set_memlimit(1024 * 1024 * 1024).unwrap(); // FIXME provide some way to set compressor/decompressor limits (i.e. from cmd args)
Ok( Decompressor2 { dec: XzDecoder::new_stream(Conv{ t: to }, stream) } )
} else {
Ok( Decompressor2 { dec: XzDecoder::new(Conv{ t: to }) } )
}
}

#[allow(dead_code)]
Expand Down Expand Up @@ -113,14 +127,14 @@ mod tests {
#[test]
fn zip_unzip_small_2() {
let mut sink_for_zipped = Sink{ data: Vec::new() };
let mut comp = Compressor2::new(&mut sink_for_zipped, 8);
let mut comp = Compressor2::new(&mut sink_for_zipped, 8, 4).unwrap();
comp.add(b"HELLO").unwrap();
comp.finish().unwrap();
let data = &comp.enc.get_ref().t.data;
eprintln!("{} bytes: {:?}", data.len(), data);

let mut sink_for_unzipped = Sink{ data: Vec::new() };
let mut decomp = Decompressor2::new(&mut sink_for_unzipped);
let mut decomp = Decompressor2::new(&mut sink_for_unzipped, 4).unwrap();
decomp.add(&data.clone()).unwrap();
decomp.finish().unwrap();
let orig_data = &decomp.dec.get_ref().t.data;
Expand Down Expand Up @@ -148,7 +162,7 @@ mod tests {
thread_rng().fill_bytes(&mut src);

let mut sink_for_zipped = Sink{ data: Vec::new() };
let mut comp = Compressor2::new(&mut sink_for_zipped, 9);
let mut comp = Compressor2::new(&mut sink_for_zipped, 9, 4).unwrap();
//comp.add(&src).unwrap();
add_by_random_parts(&mut comp, &src, 512);
//eprintln!("could write {} bytes to compressor", written);
Expand All @@ -157,7 +171,7 @@ mod tests {
eprintln!("{} bytes -> {} bytes", src.len(), data.len());

let mut sink_for_unzipped = Sink{ data: Vec::new() };
let mut decomp = Decompressor2::new(&mut sink_for_unzipped);
let mut decomp = Decompressor2::new(&mut sink_for_unzipped, 4).unwrap();
//decomp.add(&data.clone()).unwrap();
add_by_random_parts(&mut decomp, &data.clone(), 512);
//eprintln!("could write {} bytes to decompressor", written);
Expand All @@ -180,7 +194,7 @@ mod tests {
buf.resize(SEND_SIZE, 0);
let mut null_sink = NullSink{};
let mut count = 0;
let mut comp = Compressor2::new(&mut null_sink, 6);
let mut comp = Compressor2::new(&mut null_sink, 6, 6).unwrap();
while !is_stop_copy.load(Ordering::SeqCst) {
thread_rng().fill_bytes(&mut buf);
comp.add(&buf).unwrap();
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn time_str() -> String {
pub fn backup<R: Read>(
mut read_from: R,
auth: &str, auth_every_bytes: usize, split_size_bytes: usize, out_template: &str,
pass: &str, compress_level: u8, buf_size_bytes: usize, exit_flag: Option<Arc<AtomicBool>>) -> Result<usize, String>
pass: &str, compress_level: u8, nr_threads: usize, buf_size_bytes: usize, exit_flag: Option<Arc<AtomicBool>>) -> Result<usize, String>
{
let hash_seed = timestamp();
let start_time_str = time_str();
Expand All @@ -75,7 +75,7 @@ pub fn backup<R: Read>(
{
let enc = Encryptor::new(&mut spl, pass, auth);
let mut fbuf = FixedSizeWriter::new(enc, auth_every_bytes);
let mut comp = Compressor2::new(&mut fbuf, compress_level as u32);
let mut comp = Compressor2::new(&mut fbuf, compress_level as u32, nr_threads as u32)?;
{
let mut hash_copier = DataHasher::with_writer(Some(&mut comp), hash_seed);

Expand Down Expand Up @@ -104,7 +104,7 @@ pub fn backup<R: Read>(
}


pub fn check<W: DataSink>(mut write_to: Option<W>, cfg_path: &str, pass: &str, buf_size_bytes: usize, check_free_space: &Option<&str>, show_info: bool) -> Result<(), String> {
pub fn check<W: DataSink>(mut write_to: Option<W>, cfg_path: &str, pass: &str, nr_threads: usize, buf_size_bytes: usize, check_free_space: &Option<&str>, show_info: bool) -> Result<(), String> {
let stats = read_metadata::<MultiFilesReader>(cfg_path)?;
if show_info {
eprintln!("authentication string: {}", stats.auth_string);
Expand All @@ -122,7 +122,7 @@ pub fn check<W: DataSink>(mut write_to: Option<W>, cfg_path: &str, pass: &str, b

let mut hash_copier = DataHasher::with_writer(ref_write_to, stats.hash_seed.unwrap());
{
let mut decomp = Decompressor2::new(&mut hash_copier);
let mut decomp = Decompressor2::new(&mut hash_copier, nr_threads as u32)?;
let dec = Decryptor::new(&mut decomp, pass, &stats.auth_string);
let mut fbuf = FixedSizeWriter::new(dec, stats.auth_chunk_size + 16);
let fmgr = MultiFilesReader::new();
Expand Down
Loading

0 comments on commit 7579d0e

Please sign in to comment.