Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[zstd_stream] Don't block in reader.Read if a zstd block is available #96

Merged
merged 2 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 64 additions & 44 deletions zstd_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,29 +421,66 @@ func (r *reader) Read(p []byte) (int, error) {
return 0, r.firstError
}

// If we already have enough bytes, return
if r.decompSize-r.decompOff >= len(p) {
copy(p, r.decompressionBuffer[r.decompOff:])
r.decompOff += len(p)
return len(p), nil
if len(p) == 0 {
return 0, nil
}

// If we already have some uncompressed bytes, return without blocking
if r.decompSize > r.decompOff {
if r.decompSize-r.decompOff > len(p) {
copy(p, r.decompressionBuffer[r.decompOff:])
r.decompOff += len(p)
return len(p), nil
}
// From https://golang.org/pkg/io/#Reader
// > Read conventionally returns what is available instead of waiting for more.
copy(p, r.decompressionBuffer[r.decompOff:r.decompSize])
delthas marked this conversation as resolved.
Show resolved Hide resolved
got := r.decompSize - r.decompOff
r.decompOff = r.decompSize
return got, nil
}

copy(p, r.decompressionBuffer[r.decompOff:r.decompSize])
got := r.decompSize - r.decompOff
r.decompSize = 0
r.decompOff = 0

for got < len(p) {
// Populate src
src := r.compressionBuffer
reader := r.underlyingReader
n, err := TryReadFull(reader, src[r.compressionLeft:])
if err != nil && err != errShortRead { // Handle underlying reader errors first
return 0, fmt.Errorf("failed to read from underlying reader: %s", err)
} else if n == 0 && r.compressionLeft == 0 {
return got, io.EOF
// Repeatedly read from the underlying reader until we get
// at least one zstd block, so that we don't block if the
// other end has flushed a block.
for {
// - If the last decompression didn't entirely fill the decompression buffer,
// zstd flushed all it could, and needs new data. In that case, do 1 Read.
// - If the last decompression did entirely fill the decompression buffer,
// it might have needed more room to decompress the input. In that case,
// don't do any unnecessary Read that might block.
needsData := r.decompSize < len(r.decompressionBuffer)

var src []byte
if !needsData {
src = r.compressionBuffer[:r.compressionLeft]
} else {
src = r.compressionBuffer
var n int
var err error
// Read until data arrives or an error occurs.
for n == 0 && err == nil {
n, err = r.underlyingReader.Read(src[r.compressionLeft:])
}
if err != nil && err != io.EOF { // Handle underlying reader errors first
return 0, fmt.Errorf("failed to read from underlying reader: %s", err)
}
if n == 0 {
// Ideally, we'd return with ErrUnexpectedEOF in all cases where the stream was unexpectedly EOF'd
// during a block or frame, i.e. when there are incomplete, pending compression data.
// However, it's hard to detect those cases with zstd. Namely, there is no way to know the size of
// the current buffered compression data in the zstd stream internal buffers.
// Best effort: throw ErrUnexpectedEOF if we still have some pending buffered compression data that
// zstd doesn't want to accept.
// If we don't have any buffered compression data but zstd still has some in its internal buffers,
// we will return with EOF instead.
if r.compressionLeft > 0 {
return 0, io.ErrUnexpectedEOF
}
return 0, io.EOF
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think you are correct that if r.decompSize == len(r.decompressionBuffer) then zstd should output a decompressed block without additional input (except if its estimation was not correct but in that case, next loop it will ask more data).
But I don't think it means the opposite is correct though: i.e: r.decompSize < len(r.decompressionBuffer) does not mean without any additional input, not output will be produced. From the documentation (https://github.com/DataDog/zstd/blob/1.x/zstd.h#L777), a previous output could actually produce many blocks even if no additional input is given (and retCode is only a hint) so I think it would be safer to defer the return of io.EOF after the actual call to C zstd if err == EOF and compressionLeft == 0 (no compression left)

You might have more background than me on non-EOFed pipes but my understanding is that we should get (and return) only EOF when the pipe will not provide data anymore at any point in the future

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm initially thinking of changing the condition to:

remainingOut := r.decompSize - r.decompOff
if err == io.EOF && r.compressionLeft == 0 && remainingOut == 0 {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that if r.decompSize < len(r.decompressionBuffer), ie if output.pos < output.size, the decoder flushed all it could (see https://github.com/DataDog/zstd/blob/1.x/zstd.h#L774); in other words the decoder needs more data in order to output anything else.

The line you linked seems to refer to a special condition if len(r.decompressionBuffer) > ZSTD_BLOCKSIZE_MAX. However this will never be the case, because the decompression buffer is allocated by a pool to a size of ZSTD_DStreamOutSize, which is == ZSTD_BLOCKSIZE_MAX. (Which is the purpose of using ZSTD_DStreamOutSize().)

So the condition looks correct to me as is. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed! I didn't read on ZSTD_DStreamOutSize == ZSTD_BLOCKSIZE_MAX but yes you are correct

}
src = src[:r.compressionLeft+n]
}
src = src[:r.compressionLeft+n]

// C code
var srcPtr *byte // Do not point anywhere, if src is empty
Expand All @@ -461,9 +498,9 @@ func (r *reader) Read(p []byte) (int, error) {
)
retCode := int(r.resultBuffer.return_code)

// Keep src here eventhough we reuse later, the code might be deleted at some point
// Keep src here even though we reuse later, the code might be deleted at some point
runtime.KeepAlive(src)
if err = getError(retCode); err != nil {
if err := getError(retCode); err != nil {
return 0, fmt.Errorf("failed to decompress: %s", err)
}

Expand All @@ -473,10 +510,9 @@ func (r *reader) Read(p []byte) (int, error) {
left := src[bytesConsumed:]
copy(r.compressionBuffer, left)
}
r.compressionLeft = len(src) - int(bytesConsumed)
r.compressionLeft = len(src) - bytesConsumed
r.decompSize = int(r.resultBuffer.bytes_written)
r.decompOff = copy(p[got:], r.decompressionBuffer[:r.decompSize])
got += r.decompOff
r.decompOff = copy(p, r.decompressionBuffer[:r.decompSize])

// Resize buffers
nsize := retCode // Hint for next src buffer size
Expand All @@ -488,25 +524,9 @@ func (r *reader) Read(p []byte) (int, error) {
nsize = r.compressionLeft
}
r.compressionBuffer = resize(r.compressionBuffer, nsize)
}
return got, nil
}

// TryReadFull reads buffer just as ReadFull does
// Here we expect that buffer may end and we do not return ErrUnexpectedEOF as ReadAtLeast does.
// We return errShortRead instead to distinguish short reads and failures.
// We cannot use ReadFull/ReadAtLeast because it masks Reader errors, such as network failures
// and causes panic instead of error.
func TryReadFull(r io.Reader, buf []byte) (n int, err error) {
for n < len(buf) && err == nil {
var nn int
nn, err = r.Read(buf[n:])
n += nn
}
if n == len(buf) && err == io.EOF {
err = nil // EOF at the end is somewhat expected
} else if err == io.EOF {
err = errShortRead
if r.decompOff > 0 {
return r.decompOff, nil
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking also about a input stream that got accidentaly cut off (so it starts returning EOF but we still have some zstd partial data), we could return a io.UnexpectedEOF
https://golang.org/pkg/io/#pkg-variables

Functions should return EOF only to signal a graceful end of input. If the EOF occurs unexpectedly in a structured data stream, the appropriate error is either ErrUnexpectedEOF or some other error giving more detail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
return
}
17 changes: 12 additions & 5 deletions zstd_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func testCompressionDecompression(t *testing.T, dict []byte, payload []byte) {
// Decompress
r := NewReaderDict(rr, dict)
dst := make([]byte, len(payload))
n, err := r.Read(dst)
n, err := io.ReadFull(r, dst)
if err != nil {
failOnError(t, "Failed to read for decompression", err)
}
Expand Down Expand Up @@ -211,9 +211,16 @@ func TestStreamEmptyPayload(t *testing.T) {
}

func TestStreamFlush(t *testing.T) {
var w bytes.Buffer
writer := NewWriter(&w)
reader := NewReader(&w)
// use an actual os pipe so that
delthas marked this conversation as resolved.
Show resolved Hide resolved
// - it's buffered and we don't get a 1-read = 1-write behaviour (io.Pipe)
// - reading doesn't send EOF when we're done reading the buffer (bytes.Buffer)
pr, pw, err := os.Pipe()
failOnError(t, "Failed creating pipe", err)
defer pw.Close()
defer pr.Close()

writer := NewWriter(pw)
reader := NewReader(pr)

payload := "cc" // keep the payload short to make sure it will not be automatically flushed by zstd
buf := make([]byte, len(payload))
Expand Down Expand Up @@ -429,7 +436,7 @@ func BenchmarkStreamDecompression(b *testing.B) {
for i := 0; i < b.N; i++ {
rr := bytes.NewReader(compressed)
r := NewReader(rr)
_, err := r.Read(dst)
_, err := io.ReadFull(r, dst)
if err != nil {
b.Fatalf("Failed to decompress: %s", err)
}
Expand Down