-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming Decompressor v4 #59
Conversation
7834b5d
to
6406291
Compare
Create a new struct `Stream` that uses the `std::io::Write` interface to read chunks of compressed data and write them to an output sink. Add a streaming mode so processing can work with streaming chunks of data. This is required because process() assumed the input reader contained a complete stream. Update flags and try_process_next() were added to handle when the decompressor requests more input bytes than are available. Data is temporarily buffered in the DecoderState if more input bytes are required to make progress. This commit also adds utility functions to the rangecoder for working with streaming data. Adds an allow_incomplete option to disable end of stream checks when calling `finish()` on a stream. This is because some users may want to retrieve partially decompressed data.
6406291
to
2126c78
Compare
// Fill as much of the tmp buffer as possible | ||
let start = self.partial_input_buf.position() as usize; | ||
let bytes_read = | ||
rangecoder.read_into(&mut self.partial_input_buf.get_mut()[start..])? as u64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI was failing because Rust 1.32 doesn't have std::convert::TryFrom
. I think the cast to u64 is safe here because of the invariant bytes_read <= MAX_REQUIRED_INPUT
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
Benchmarks are:
|
I'll re-review the changes, and perform benchmarks locally to confirm the performance impact. From your results, it seems that there could be a slight performance regression even in non-streaming mode, maybe due to the changes in the range coder ( I hope to get back with a more complete reply in the next few days. |
@gendx: Do you have time to revisit this? ruffle-rs/ruffle#405 (comment) indicates a need for streaming support prior to inclusion. Ruffle is getting more attention due to next month's demise of the Flash browser plugin. There isn't that much lzma-encoded Flash - gzip remained the default - but it exists, and it'd be nice for it to handle them, as well as helping the projects mentioned in #10. |
@gendx any updates on this pull request? I agree that the slight performance regression may be caused by the changes in the The first version of my pull request avoided this issue by writing two versions of the I ran the benchmarks once again today. This shows a larger 8-12% difference for smaller tests and a lower 3% difference for the big file test. Larger input may have better performance due to initial setup costs.
I generated flamegraphs and attached them to this thread. N is the number of test iterations.
Flamegraph analysis:
I tried doing some micro optimizations by moving branches but overall this did not create a significant performance increase. On the positive side, the streaming decompressor is more cpu and memory efficient :). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry again for the delay in reviewing this.
Overall this should be good to go as an experimental streaming feature. I left some minor comments for reference, which can be fixed separately after merging.
Regarding performance, the minor regression is not that large, at least when the feature is off. But further PRs are welcome to improve performance if needed.
However, I'd like to experiment with an async
implementation in the future, e.g. over futures::io::AsyncRead and futures::io::AsyncWrite, which should provide a more seamless way of streaming without having to implement the state machine for buffering within lzma-rs, and in particular without having to have this update
boolean passed throughout the implementation.
Other improvements could be to make RangeDecoder
a trait and integrating the update
bit into it (i.e. having UpdatingRangeDecoder
and ObservingRangeDecoder
that implement the RangeDecoder
trait), or to leverage const generics to store the update bit as a const-generic boolean.
use std::io::Write; | ||
|
||
/// Utility function to read a file into memory | ||
fn read_to_file(filename: &str) -> std::io::Result<Vec<u8>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would better be named read_from_file
or read_all_file
.
let n = compfile.read(&mut tmp).unwrap(); | ||
stream.write_all(&tmp[0..n]).unwrap(); | ||
|
||
if n == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Break should be just after the function call.
} | ||
} | ||
|
||
fn assert_decomp_eq(input: &[u8], expected: &[u8]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some code duplication with the previous function.
/// Tells the decompressor if we should expect more data after parsing the | ||
/// current input. | ||
#[derive(Debug, PartialEq)] | ||
pub enum ProcessingMode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be pub.
let props = input | ||
.read_u8() | ||
.map_err(|e| error::Error::LZMAError(format!("LZMA header too short: {}", e)))?; | ||
let props = input.read_u8().map_err(error::Error::HeaderTooShort)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it work when streaming in chunks of 1 byte?
let mut stream = lzma_rs::decompress::Stream::new(Vec::new()); | ||
|
||
// read file in chunks | ||
let mut tmp = [0u8; 1024]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should test chunks of 1 byte as well.
&& (self.partial_input_buf.position() as usize) < MAX_REQUIRED_INPUT | ||
&& self | ||
.try_process_next( | ||
&tmp[0..self.partial_input_buf.position() as usize], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unneeded zero at the beginning of the range. Same below.
// Fill as much of the tmp buffer as possible | ||
let start = self.partial_input_buf.position() as usize; | ||
let bytes_read = | ||
rangecoder.read_into(&mut self.partial_input_buf.get_mut()[start..])? as u64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
Ok(()) | ||
} | ||
|
||
pub fn process_mode<'a, R: io::BufRead>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be pub.
Thanks for the review and merge :). I will look at the comments in the near future. Is there anything else required to publish the added features to crates.io? I agree that the async traits could be useful and is worth looking into. Some projects using this crate may not have an async runtime or may depend on using the |
Thanks also a lot for your patience, and sorry again for the reviewing delay, this was a big change and the last months have been quite busy on my end.
The plan would be to have 3 implementations:
From a maintenance point-of-view, keeping only one of (2) and (3) would be preferable in the long term (depending on actual adoption), but on the other hand it's indeed not always possible/easy to have users pass an |
For now, the streaming implementation doesn't work for some combination(s) of the test cases and the streaming chunking size, as evidenced by #63, so fixing that is still required before publishing a new release on crates.io. |
Pull Request Overview
Changes since last PR:
loop
RangeDecoder::remaining
functionu64
Mode
toProcessingMode
and added aProcessingStatus
instead of using abool
tmp
topartial_input_buf
to make it more clearRangeDecoder
members public instead of having gettersunpacked_size_write_none_to_header_and_use_provided_on_read
causes aWriteZero
errorVec<u8>
MAX_REQUIRED_INPUT
is 20 bytesread_partial_input_buf
Testing Strategy
This pull request was tested by...
.lzma
,.lzma2
,.xz
files).Supporting Documentation and References
Link to previous PR: #58