Skip to content

Commit

Permalink
stream: Streaming API for decompression
Browse files Browse the repository at this point in the history
Create a new struct `Stream` that uses the `std::io::Write` interface
to read chunks of compressed data and write them to an output sink.

Add a streaming mode so processing can work with streaming chunks of
data. This is required because process() assumed the input reader
contained a complete stream.

Update flags and try_process_next() were added to handle when the
decompressor requests more input bytes than are available. Data is
temporarily buffered in the DecoderState if more input bytes are
required to make progress.

This commit also adds utility functions to the rangecoder for working
with streaming data.

Adds an allow_incomplete option to disable end of stream checks when
calling `finish()` on a stream. This is because some users may want
to retrieve partially decompressed data.
  • Loading branch information
cccs-sadugas committed Oct 5, 2020
1 parent 04503f3 commit 7834b5d
Show file tree
Hide file tree
Showing 13 changed files with 1,058 additions and 149 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ env_logger = { version = "^0.7.1", optional = true }

[features]
enable_logging = ["env_logger", "log"]
stream = []

[badges]
travis-ci = { repository = "gendx/lzma-rs", branch = "master" }
26 changes: 26 additions & 0 deletions benches/lzma.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,31 @@ fn decompress_bench(compressed: &[u8], b: &mut Bencher) {
});
}

#[cfg(feature = "stream")]
fn decompress_stream_bench(compressed: &[u8], b: &mut Bencher) {
use std::io::Write;
b.iter(|| {
let mut stream = lzma_rs::decompress::Stream::new(Vec::new());
stream.write_all(compressed).unwrap();
stream.finish().unwrap()
});
}

fn decompress_bench_file(compfile: &str, b: &mut Bencher) {
let mut f = std::fs::File::open(compfile).unwrap();
let mut compressed = Vec::new();
f.read_to_end(&mut compressed).unwrap();
decompress_bench(&compressed, b);
}

#[cfg(feature = "stream")]
fn decompress_stream_bench_file(compfile: &str, b: &mut Bencher) {
let mut f = std::fs::File::open(compfile).unwrap();
let mut compressed = Vec::new();
f.read_to_end(&mut compressed).unwrap();
decompress_stream_bench(&compressed, b);
}

#[bench]
fn compress_empty(b: &mut Bencher) {
#[cfg(feature = "enable_logging")]
Expand Down Expand Up @@ -90,6 +108,14 @@ fn decompress_big_file(b: &mut Bencher) {
decompress_bench_file("tests/files/foo.txt.lzma", b);
}

#[cfg(feature = "stream")]
#[bench]
fn decompress_stream_big_file(b: &mut Bencher) {
#[cfg(feature = "enable_logging")]
let _ = env_logger::try_init();
decompress_stream_bench_file("tests/files/foo.txt.lzma", b);
}

#[bench]
fn decompress_huge_dict(b: &mut Bencher) {
#[cfg(feature = "enable_logging")]
Expand Down
18 changes: 15 additions & 3 deletions src/decode/lzbuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ where
fn get_output(&self) -> &W;
// Get a mutable reference to the output sink
fn get_output_mut(&mut self) -> &mut W;
// Flush the buffer to the output
// Consumes this buffer and flushes any data
fn finish(self) -> io::Result<W>;
// Consumes this buffer without flushing any data
fn into_output(self) -> W;
}

// An accumulating buffer for LZ sequences
Expand Down Expand Up @@ -143,12 +145,17 @@ where
&mut self.stream
}

// Flush the buffer to the output
// Consumes this buffer and flushes any data
fn finish(mut self) -> io::Result<W> {
self.stream.write_all(self.buf.as_slice())?;
self.stream.flush()?;
Ok(self.stream)
}

// Consumes this buffer without flushing any data
fn into_output(self) -> W {
self.stream
}
}

// A circular buffer for LZ sequences
Expand Down Expand Up @@ -291,12 +298,17 @@ where
&mut self.stream
}

// Flush the buffer to the output
// Consumes this buffer and flushes any data
fn finish(mut self) -> io::Result<W> {
if self.cursor > 0 {
self.stream.write_all(&self.buf[0..self.cursor])?;
self.stream.flush()?;
}
Ok(self.stream)
}

// Consumes this buffer without flushing any data
fn into_output(self) -> W {
self.stream
}
}
Loading

0 comments on commit 7834b5d

Please sign in to comment.