Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[zstd_stream] Don't block in reader.Read if a zstd block is available #96

Merged
merged 2 commits into from
Nov 24, 2021

Conversation

delthas
Copy link
Contributor

@delthas delthas commented Mar 2, 2021

reader.Read used to try to fully read an internal buffer until EOF or
the buffer was filled. That was buffer was set to ZSTD_DStreamInSize,
which is larger than any zstd block.

This means that reader.Read could try to buffer much more data than
what was needed to process and return a single block from the Read
method.

This was an issue because we could miss an urgent Flush from a
corresponding Writer by blocking. (A typical use case is instant
messaging.) It was also against the general convention of io.Read that a
single call should return any immediately available data without
blocking, if any.

Interestingly enough, the test case should have caught this up, but
because we used a bytes.Buffer, the Read returned io.EOF after reading
the entirety of the buffer, even if we appended to the buffer later on.
The test case is also fixed by this commit.

Fixes: #95

@delthas
Copy link
Contributor Author

delthas commented Mar 2, 2021

It could be interesting to run some benchmarks to see if that impacts performance, and if yes, to take a decision on whether that should be the default or flag-dependent.

@delthas delthas marked this pull request as draft March 2, 2021 22:12
@delthas
Copy link
Contributor Author

delthas commented Mar 30, 2021

Benchmarks:
Before (modified the Decompression test to use io.ReadAll so that we test the same stuff as After):

goos: windows
goarch: amd64
pkg: github.com/DataDog/zstd
cpu: Intel(R) Core(TM) i7-3820 CPU @ 3.60GHz
BenchmarkStreamCompression
BenchmarkStreamCompression-8     	     248	 143712750 ns/op	  69.38 MB/s
BenchmarkStreamDecompression
BenchmarkStreamDecompression-8   	    2014	  17949798 ns/op	 555.47 MB/s
PASS

After:

goos: windows
goarch: amd64
pkg: github.com/DataDog/zstd
cpu: Intel(R) Core(TM) i7-3820 CPU @ 3.60GHz
BenchmarkStreamCompression
BenchmarkStreamCompression-8     	     244	 144691207 ns/op	  68.91 MB/s
BenchmarkStreamDecompression
BenchmarkStreamDecompression-8   	    1989	  18085000 ns/op	 551.32 MB/s
PASS

No significant change for compression (as expected), ~0.7% slowdown for decompression. IMO no need to add a flag for that, the performance difference is OK.

@delthas delthas marked this pull request as ready for review March 30, 2021 10:19
@delthas
Copy link
Contributor Author

delthas commented Mar 30, 2021

Ready for review @Viq111

@delthas
Copy link
Contributor Author

delthas commented May 24, 2021

Gentle ping 😃

@Viq111
Copy link
Collaborator

Viq111 commented Jun 1, 2021

Hi! Sorry I dropped the ball on this one so that's totally on me.
I was out last week but I'm back this week so I'll be able to give it a look.

Copy link
Collaborator

@Viq111 Viq111 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! It definitely makes a lot of sense
Rereading the stream decompression path, I think we can handle / document a bit better all the EOF cases

zstd_stream_test.go Show resolved Hide resolved
zstd_stream.go Show resolved Hide resolved
return 0, fmt.Errorf("failed to read from underlying reader: %s", err)
}
if n == 0 {
return 0, io.EOF
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think you are correct that if r.decompSize == len(r.decompressionBuffer) then zstd should output a decompressed block without additional input (except if its estimation was not correct but in that case, next loop it will ask more data).
But I don't think it means the opposite is correct though: i.e: r.decompSize < len(r.decompressionBuffer) does not mean without any additional input, not output will be produced. From the documentation (https://github.com/DataDog/zstd/blob/1.x/zstd.h#L777), a previous output could actually produce many blocks even if no additional input is given (and retCode is only a hint) so I think it would be safer to defer the return of io.EOF after the actual call to C zstd if err == EOF and compressionLeft == 0 (no compression left)

You might have more background than me on non-EOFed pipes but my understanding is that we should get (and return) only EOF when the pipe will not provide data anymore at any point in the future

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm initially thinking of changing the condition to:

remainingOut := r.decompSize - r.decompOff
if err == io.EOF && r.compressionLeft == 0 && remainingOut == 0 {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that if r.decompSize < len(r.decompressionBuffer), ie if output.pos < output.size, the decoder flushed all it could (see https://github.com/DataDog/zstd/blob/1.x/zstd.h#L774); in other words the decoder needs more data in order to output anything else.

The line you linked seems to refer to a special condition if len(r.decompressionBuffer) > ZSTD_BLOCKSIZE_MAX. However this will never be the case, because the decompression buffer is allocated by a pool to a size of ZSTD_DStreamOutSize, which is == ZSTD_BLOCKSIZE_MAX. (Which is the purpose of using ZSTD_DStreamOutSize().)

So the condition looks correct to me as is. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed! I didn't read on ZSTD_DStreamOutSize == ZSTD_BLOCKSIZE_MAX but yes you are correct

err = errShortRead
if r.decompOff > 0 {
return r.decompOff, nil
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking also about a input stream that got accidentaly cut off (so it starts returning EOF but we still have some zstd partial data), we could return a io.UnexpectedEOF
https://golang.org/pkg/io/#pkg-variables

Functions should return EOF only to signal a graceful end of input. If the EOF occurs unexpectedly in a structured data stream, the appropriate error is either ErrUnexpectedEOF or some other error giving more detail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Viq111
Copy link
Collaborator

Viq111 commented Jun 4, 2021

For benchmarking, I think those changes are immaterial so I would not worry about them:

ᐅ benchstat
goos: darwin
goarch: amd64
pkg: github.com/DataDog/zstd
cpu: Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
name                    old time/op    new time/op    delta
CtxCompression-16         46.8µs ± 1%    50.5µs ±13%   ~     (p=0.065 n=9+10)
CtxDecompression-16       2.70µs ± 5%    2.95µs ±15%   ~     (p=0.247 n=10+10)
StreamCompression-16       869ns ±13%     898ns ±12%   ~     (p=0.190 n=10+10)
StreamDecompression-16    2.50µs ± 7%    2.57µs ±14%   ~     (p=0.952 n=10+10)
Compression-16            51.0µs ± 8%    52.1µs ± 8%   ~     (p=0.684 n=10+10)
Decompression-16          3.07µs ±16%    3.11µs ±15%   ~     (p=0.542 n=10+10)

name                    old speed      new speed      delta
CtxCompression-16        151MB/s ± 1%   140MB/s ±12%   ~     (p=0.065 n=9+10)
CtxDecompression-16     2.61GB/s ± 5%  2.42GB/s ±14%   ~     (p=0.247 n=10+10)
StreamCompression-16    8.15GB/s ±12%  7.90GB/s ±11%   ~     (p=0.190 n=10+10)
StreamDecompression-16  2.83GB/s ± 7%  2.76GB/s ±13%   ~     (p=0.971 n=10+10)
Compression-16           139MB/s ± 7%   136MB/s ± 8%   ~     (p=0.684 n=10+10)
Decompression-16        2.31GB/s ±14%  2.28GB/s ±13%   ~     (p=0.529 n=10+10)

reader.Read used to try to fully read an internal buffer until EOF or
the buffer was filled. That was buffer was set to ZSTD_DStreamInSize,
which is larger than any zstd block.

This means that reader.Read could try to buffer much more data than
what was needed to process and return a single block from the Read
method.

This was an issue because we could miss an urgent Flush from a
corresponding Writer by blocking. (A typical use case is instant
messaging.) It was also against the general convention of io.Read that a
single call should return any immediately available data without
blocking, if any.

Interestingly enough, the test case should have caught this up, but
because we used a bytes.Buffer, the Read returned io.EOF after reading
the entirety of the buffer, even if we appended to the buffer later on.
The test case is also fixed by this commit.

Fixes: DataDog#95
@delthas delthas force-pushed the fix-stream-read branch from 25758a4 to 42c5dcb Compare July 9, 2021 13:36
@delthas
Copy link
Contributor Author

delthas commented Oct 13, 2021

Gentle ping 😀

@Viq111 Viq111 merged commit c4c921b into DataDog:1.x Nov 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[zstd_stream] Reader.Read can block even if a zstd block is available
2 participants