Skip to content

Commit

Permalink
zstd: Add decoder dictionary support (#260)
Browse files Browse the repository at this point in the history
* zstd: Add decoder dictionary support
  • Loading branch information
klauspost authored Jun 1, 2020
1 parent 057ea20 commit 90824b4
Show file tree
Hide file tree
Showing 14 changed files with 479 additions and 111 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ This package provides various compression algorithms.

# changelog

* June 1, 2020 (v1.10.7): Added zstd decompression [dictionary support](https://github.com/klauspost/compress/tree/master/zstd#dictionaries).
* June 1, 2020: Increase zstd decompression speed up to 1.19x. [#259](https://github.com/klauspost/compress/pull/259)
* June 1, 2020: Remove internal reset call in zstd compression and reduce allocations. [#263](https://github.com/klauspost/compress/pull/263)
* May 21, 2020: (v1.10.6) zstd: Reduce allocations while decoding. [#258](https://github.com/klauspost/compress/pull/258), [#252](https://github.com/klauspost/compress/pull/252)
* May 21, 2020: zstd: Stricter decoding checks.
* May 21, 2020: zstd: Stricter decompression checks.
* April 12, 2020: (v1.10.5) s2-commands: Flush output when receiving SIGINT. [#239](https://github.com/klauspost/compress/pull/239)
* Apr 8, 2020: (v1.10.4) zstd: Minor/special case optimizations. [#251](https://github.com/klauspost/compress/pull/251), [#250](https://github.com/klauspost/compress/pull/250), [#249](https://github.com/klauspost/compress/pull/249), [#247](https://github.com/klauspost/compress/pull/247)
* Mar 11, 2020: (v1.10.3) s2: Use S2 encoder in pure Go mode for Snappy output as well. [#245](https://github.com/klauspost/compress/pull/245)
Expand Down
166 changes: 104 additions & 62 deletions huff0/decompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,70 @@ func ReadTable(in []byte, s *Scratch) (s2 *Scratch, remain []byte, err error) {
// The length of the supplied input must match the end of a block exactly.
// Before this is called, the table must be initialized with ReadTable unless
// the encoder re-used the table.
// deprecated: Use the stateless Decoder() to get a concurrent version.
func (s *Scratch) Decompress1X(in []byte) (out []byte, err error) {
if len(s.dt.single) == 0 {
if cap(s.Out) < s.MaxDecodedSize {
s.Out = make([]byte, s.MaxDecodedSize)
}
s.Out = s.Out[:0:s.MaxDecodedSize]
s.Out, err = s.Decoder().Decompress1X(s.Out, in)
return s.Out, err
}

// Decompress4X will decompress a 4X encoded stream.
// Before this is called, the table must be initialized with ReadTable unless
// the encoder re-used the table.
// The length of the supplied input must match the end of a block exactly.
// The destination size of the uncompressed data must be known and provided.
// deprecated: Use the stateless Decoder() to get a concurrent version.
func (s *Scratch) Decompress4X(in []byte, dstSize int) (out []byte, err error) {
if dstSize > s.MaxDecodedSize {
return nil, ErrMaxDecodedSizeExceeded
}
if cap(s.Out) < dstSize {
s.Out = make([]byte, s.MaxDecodedSize)
}
s.Out = s.Out[:0:dstSize]
s.Out, err = s.Decoder().Decompress4X(s.Out, in)
return s.Out, err
}

// Decoder will return a stateless decoder that can be used by multiple
// decompressors concurrently.
// Before this is called, the table must be initialized with ReadTable.
// The Decoder is still linked to the scratch buffer so that cannot be reused.
// However, it is safe to discard the scratch.
func (s *Scratch) Decoder() *Decoder {
return &Decoder{
dt: s.dt,
actualTableLog: s.actualTableLog,
}
}

// Decoder provides stateless decoding.
type Decoder struct {
dt dTable
actualTableLog uint8
}

// Decompress1X will decompress a 1X encoded stream.
// The cap of the output buffer will be the maximum decompressed size.
// The length of the supplied input must match the end of a block exactly.
func (d *Decoder) Decompress1X(dst, src []byte) ([]byte, error) {
if len(d.dt.single) == 0 {
return nil, errors.New("no table loaded")
}
var br bitReader
err = br.init(in)
err := br.init(src)
if err != nil {
return nil, err
return dst, err
}
s.Out = s.Out[:0]
maxDecodedSize := cap(dst)
dst = dst[:0]

decode := func() byte {
val := br.peekBitsFast(s.actualTableLog) /* note : actualTableLog >= 1 */
v := s.dt.single[val]
val := br.peekBitsFast(d.actualTableLog) /* note : actualTableLog >= 1 */
v := d.dt.single[val]
br.bitsRead += uint8(v.entry)
return uint8(v.entry >> 8)
}
Expand All @@ -180,88 +230,80 @@ func (s *Scratch) Decompress1X(in []byte) (out []byte, err error) {
// Avoid bounds check by always having full sized table.
const tlSize = 1 << tableLogMax
const tlMask = tlSize - 1
dt := s.dt.single[:tlSize]
dt := d.dt.single[:tlSize]

// Use temp table to avoid bound checks/append penalty.
var tmp = s.huffWeight[:256]
var buf [256]byte
var off uint8

for br.off >= 8 {
br.fillFast()
tmp[off+0] = hasDec(dt[br.peekBitsFast(s.actualTableLog)&tlMask])
tmp[off+1] = hasDec(dt[br.peekBitsFast(s.actualTableLog)&tlMask])
buf[off+0] = hasDec(dt[br.peekBitsFast(d.actualTableLog)&tlMask])
buf[off+1] = hasDec(dt[br.peekBitsFast(d.actualTableLog)&tlMask])
br.fillFast()
tmp[off+2] = hasDec(dt[br.peekBitsFast(s.actualTableLog)&tlMask])
tmp[off+3] = hasDec(dt[br.peekBitsFast(s.actualTableLog)&tlMask])
buf[off+2] = hasDec(dt[br.peekBitsFast(d.actualTableLog)&tlMask])
buf[off+3] = hasDec(dt[br.peekBitsFast(d.actualTableLog)&tlMask])
off += 4
if off == 0 {
if len(s.Out)+256 > s.MaxDecodedSize {
if len(dst)+256 > maxDecodedSize {
br.close()
return nil, ErrMaxDecodedSizeExceeded
}
s.Out = append(s.Out, tmp...)
dst = append(dst, buf[:]...)
}
}

if len(s.Out)+int(off) > s.MaxDecodedSize {
if len(dst)+int(off) > maxDecodedSize {
br.close()
return nil, ErrMaxDecodedSizeExceeded
}
s.Out = append(s.Out, tmp[:off]...)
dst = append(dst, buf[:off]...)

for !br.finished() {
br.fill()
if len(s.Out) >= s.MaxDecodedSize {
if len(dst) >= maxDecodedSize {
br.close()
return nil, ErrMaxDecodedSizeExceeded
}
s.Out = append(s.Out, decode())
dst = append(dst, decode())
}
return s.Out, br.close()
return dst, br.close()
}

// Decompress4X will decompress a 4X encoded stream.
// Before this is called, the table must be initialized with ReadTable unless
// the encoder re-used the table.
// The length of the supplied input must match the end of a block exactly.
// The destination size of the uncompressed data must be known and provided.
func (s *Scratch) Decompress4X(in []byte, dstSize int) (out []byte, err error) {
// The *capacity* of the dst slice must match the destination size of
// the uncompressed data exactly.
func (s *Decoder) Decompress4X(dst, src []byte) ([]byte, error) {
if len(s.dt.single) == 0 {
return nil, errors.New("no table loaded")
}
if len(in) < 6+(4*1) {
if len(src) < 6+(4*1) {
return nil, errors.New("input too small")
}
if dstSize > s.MaxDecodedSize {
return nil, ErrMaxDecodedSizeExceeded
}
// TODO: We do not detect when we overrun a buffer, except if the last one does.

var br [4]bitReader
start := 6
for i := 0; i < 3; i++ {
length := int(in[i*2]) | (int(in[i*2+1]) << 8)
if start+length >= len(in) {
length := int(src[i*2]) | (int(src[i*2+1]) << 8)
if start+length >= len(src) {
return nil, errors.New("truncated input (or invalid offset)")
}
err = br[i].init(in[start : start+length])
err := br[i].init(src[start : start+length])
if err != nil {
return nil, err
}
start += length
}
err = br[3].init(in[start:])
err := br[3].init(src[start:])
if err != nil {
return nil, err
}

// Prepare output
if cap(s.Out) < dstSize {
s.Out = make([]byte, 0, dstSize)
}
s.Out = s.Out[:dstSize]
// destination, offset to match first output
dstOut := s.Out
dstSize := cap(dst)
dst = dst[:dstSize]
out := dst
dstEvery := (dstSize + 3) / 4

const tlSize = 1 << tableLogMax
Expand All @@ -276,7 +318,7 @@ func (s *Scratch) Decompress4X(in []byte, dstSize int) (out []byte, err error) {
}

// Use temp table to avoid bound checks/append penalty.
var tmp = s.huffWeight[:256]
var buf [256]byte
var off uint8
var decoded int

Expand All @@ -300,8 +342,8 @@ bigloop:

val2 := br[stream].peekBitsFast(s.actualTableLog)
v2 := single[val2&tlMask]
tmp[off+bufoff*stream+1] = uint8(v2.entry >> 8)
tmp[off+bufoff*stream] = uint8(v.entry >> 8)
buf[off+bufoff*stream+1] = uint8(v2.entry >> 8)
buf[off+bufoff*stream] = uint8(v.entry >> 8)
br[stream].bitsRead += uint8(v2.entry)
}

Expand All @@ -313,8 +355,8 @@ bigloop:

val2 := br[stream].peekBitsFast(s.actualTableLog)
v2 := single[val2&tlMask]
tmp[off+bufoff*stream+1] = uint8(v2.entry >> 8)
tmp[off+bufoff*stream] = uint8(v.entry >> 8)
buf[off+bufoff*stream+1] = uint8(v2.entry >> 8)
buf[off+bufoff*stream] = uint8(v.entry >> 8)
br[stream].bitsRead += uint8(v2.entry)
}

Expand All @@ -326,8 +368,8 @@ bigloop:

val2 := br[stream].peekBitsFast(s.actualTableLog)
v2 := single[val2&tlMask]
tmp[off+bufoff*stream+1] = uint8(v2.entry >> 8)
tmp[off+bufoff*stream] = uint8(v.entry >> 8)
buf[off+bufoff*stream+1] = uint8(v2.entry >> 8)
buf[off+bufoff*stream] = uint8(v.entry >> 8)
br[stream].bitsRead += uint8(v2.entry)
}

Expand All @@ -339,8 +381,8 @@ bigloop:

val2 := br[stream].peekBitsFast(s.actualTableLog)
v2 := single[val2&tlMask]
tmp[off+bufoff*stream+1] = uint8(v2.entry >> 8)
tmp[off+bufoff*stream] = uint8(v.entry >> 8)
buf[off+bufoff*stream+1] = uint8(v2.entry >> 8)
buf[off+bufoff*stream] = uint8(v.entry >> 8)
br[stream].bitsRead += uint8(v2.entry)
}

Expand All @@ -350,30 +392,30 @@ bigloop:
if bufoff > dstEvery {
return nil, errors.New("corruption detected: stream overrun 1")
}
copy(dstOut, tmp[:bufoff])
copy(dstOut[dstEvery:], tmp[bufoff:bufoff*2])
copy(dstOut[dstEvery*2:], tmp[bufoff*2:bufoff*3])
copy(dstOut[dstEvery*3:], tmp[bufoff*3:bufoff*4])
copy(out, buf[:bufoff])
copy(out[dstEvery:], buf[bufoff:bufoff*2])
copy(out[dstEvery*2:], buf[bufoff*2:bufoff*3])
copy(out[dstEvery*3:], buf[bufoff*3:bufoff*4])
off = 0
dstOut = dstOut[bufoff:]
out = out[bufoff:]
decoded += 256
// There must at least be 3 buffers left.
if len(dstOut) < dstEvery*3 {
if len(out) < dstEvery*3 {
return nil, errors.New("corruption detected: stream overrun 2")
}
}
}
if off > 0 {
ioff := int(off)
if len(dstOut) < dstEvery*3+ioff {
if len(out) < dstEvery*3+ioff {
return nil, errors.New("corruption detected: stream overrun 3")
}
copy(dstOut, tmp[:off])
copy(dstOut[dstEvery:dstEvery+ioff], tmp[bufoff:bufoff*2])
copy(dstOut[dstEvery*2:dstEvery*2+ioff], tmp[bufoff*2:bufoff*3])
copy(dstOut[dstEvery*3:dstEvery*3+ioff], tmp[bufoff*3:bufoff*4])
copy(out, buf[:off])
copy(out[dstEvery:dstEvery+ioff], buf[bufoff:bufoff*2])
copy(out[dstEvery*2:dstEvery*2+ioff], buf[bufoff*2:bufoff*3])
copy(out[dstEvery*3:dstEvery*3+ioff], buf[bufoff*3:bufoff*4])
decoded += int(off) * 4
dstOut = dstOut[off:]
out = out[off:]
}

// Decode remaining.
Expand All @@ -382,10 +424,10 @@ bigloop:
br := &br[i]
for !br.finished() {
br.fill()
if offset >= len(dstOut) {
if offset >= len(out) {
return nil, errors.New("corruption detected: stream overrun 4")
}
dstOut[offset] = decode(br)
out[offset] = decode(br)
offset++
}
decoded += offset - dstEvery*i
Expand All @@ -397,7 +439,7 @@ bigloop:
if dstSize != decoded {
return nil, errors.New("corruption detected: short output block")
}
return s.Out, nil
return dst, nil
}

// matches will compare a decoding table to a coding table.
Expand Down
14 changes: 14 additions & 0 deletions zstd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,20 @@ The decoder can be used for *concurrent* decompression of multiple buffers.
It will only allow a certain number of concurrent operations to run.
To tweak that yourself use the `WithDecoderConcurrency(n)` option when creating the decoder.

### Dictionaries

Data compressed with [dictionaries](https://github.com/facebook/zstd#the-case-for-small-data-compression) can be decompressed.

Dictionaries are added individually to Decoders.
Dictionaries are generated by the `zstd --train` command and contains an initial state for the decoder.
To add a dictionary use the `RegisterDict(data)` with the dictionary data before starting any decompression.

The dictionary will be used automatically for the data that specifies them.

A re-used Decoder will still contain the dictionaries registered.

When registering a dictionary with the same ID it will override the existing.

### Allocation-less operation

The decoder has been designed to operate without allocations after a warmup.
Expand Down
Loading

0 comments on commit 90824b4

Please sign in to comment.