Skip to content

Commit

Permalink
addressed comments on BulkProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
Barbayar committed Apr 4, 2022
1 parent 0114227 commit 17aac71
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 85 deletions.
86 changes: 63 additions & 23 deletions zstd_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,62 @@ package zstd
import "C"
import (
"errors"
"runtime"
"unsafe"
)

// BulkProcessor implements Bulk processing dictionary API
var (
// ErrEmptyDictionary is returned when the given dictionary is empty
ErrEmptyDictionary = errors.New("Dictionary is empty")
// ErrBadDictionary is returned when cannot load the given dictionary
ErrBadDictionary = errors.New("Cannot load dictionary")
// ErrContentSize is returned when cannot determine the content size
ErrContentSize = errors.New("Cannot determine the content size")
)

// BulkProcessor implements Bulk processing dictionary API.
// When compressing multiple messages or blocks using the same dictionary,
// it's recommended to digest the dictionary only once, since it's a costly operation.
// NewBulkProcessor() will create a state from digesting a dictionary.
// The resulting state can be used for future compression/decompression operations with very limited startup cost.
// BulkProcessor can be created once and shared by multiple threads concurrently, since its usage is read-only.
// The state will be freed when gc cleans up BulkProcessor.
type BulkProcessor struct {
cDict *C.struct_ZSTD_CDict_s
dDict *C.struct_ZSTD_DDict_s
}

// NewBulkProcessor creates a new BulkProcessor with a pre-trained dictionary and compression level
func NewBulkProcessor(dictionary []byte, compressionLevel int) (*BulkProcessor, error) {
if len(dictionary) < 1 {
return nil, ErrEmptyDictionary
}

p := &BulkProcessor{}
runtime.SetFinalizer(p, finalizeBulkProcessor)

p.cDict = C.ZSTD_createCDict(
unsafe.Pointer(&dictionary[0]),
C.size_t(len(dictionary)),
C.int(compressionLevel),
)
if p.cDict == nil {
return nil, errors.New("failed to create dictionary")
return nil, ErrBadDictionary
}
p.dDict = C.ZSTD_createDDict(
unsafe.Pointer(&dictionary[0]),
C.size_t(len(dictionary)),
)
if p.dDict == nil {
return nil, errors.New("failed to create dictionary")
return nil, ErrBadDictionary
}

return p, nil
}

// Compress compresses the `src` with the dictionary
// Compress compresses `src` into `dst` with the dictionary given when creating the BulkProcessor.
// If you have a buffer to use, you can pass it to prevent allocation.
// If it is too small, or if nil is passed, a new buffer will be allocated and returned.
func (p *BulkProcessor) Compress(dst, src []byte) ([]byte, error) {
bound := CompressBound(len(src))
if cap(dst) >= bound {
Expand All @@ -45,22 +70,31 @@ func (p *BulkProcessor) Compress(dst, src []byte) ([]byte, error) {
dst = make([]byte, bound)
}

var cSrc unsafe.Pointer
cctx := C.ZSTD_createCCtx()
// We need unsafe.Pointer(&src[0]) in the Cgo call to avoid "Go pointer to Go pointer" panics.
// This means we need to special case empty input. See:
// https://github.com/golang/go/issues/14210#issuecomment-346402945
var cWritten C.size_t
if len(src) == 0 {
cSrc = unsafe.Pointer(nil)
cWritten = C.ZSTD_compress_usingCDict(
cctx,
unsafe.Pointer(&dst[0]),
C.size_t(len(dst)),
unsafe.Pointer(nil),
C.size_t(len(src)),
p.cDict,
)
} else {
cSrc = unsafe.Pointer(&src[0])
cWritten = C.ZSTD_compress_usingCDict(
cctx,
unsafe.Pointer(&dst[0]),
C.size_t(len(dst)),
unsafe.Pointer(&src[0]),
C.size_t(len(src)),
p.cDict,
)
}

cctx := C.ZSTD_createCCtx()
cWritten := C.ZSTD_compress_usingCDict(
cctx,
unsafe.Pointer(&dst[0]),
C.size_t(len(dst)),
cSrc,
C.size_t(len(src)),
p.cDict,
)
C.ZSTD_freeCCtx(cctx)

written := int(cWritten)
Expand All @@ -70,14 +104,16 @@ func (p *BulkProcessor) Compress(dst, src []byte) ([]byte, error) {
return dst[:written], nil
}

// Decompress compresses the `dst` with the dictionary
// Decompress decompresses `src` into `dst` with the dictionary given when creating the BulkProcessor.
// If you have a buffer to use, you can pass it to prevent allocation.
// If it is too small, or if nil is passed, a new buffer will be allocated and returned.
func (p *BulkProcessor) Decompress(dst, src []byte) ([]byte, error) {
if len(src) == 0 {
return []byte{}, ErrEmptySlice
return nil, ErrEmptySlice
}
contentSize := uint64(C.ZSTD_getFrameContentSize(unsafe.Pointer(&src[0]), C.size_t(len(src))))
if contentSize == C.ZSTD_CONTENTSIZE_ERROR || contentSize == C.ZSTD_CONTENTSIZE_UNKNOWN {
return nil, errors.New("could not determine the content size")
return nil, ErrContentSize
}

if cap(dst) >= int(contentSize) {
Expand Down Expand Up @@ -109,8 +145,12 @@ func (p *BulkProcessor) Decompress(dst, src []byte) ([]byte, error) {
return dst[:written], nil
}

// Cleanup frees compression and decompression dictionaries from memory
func (p *BulkProcessor) Cleanup() {
C.ZSTD_freeCDict(p.cDict)
C.ZSTD_freeDDict(p.dDict)
// finalizeBulkProcessor frees compression and decompression dictionaries from memory
func finalizeBulkProcessor(p *BulkProcessor) {
if p.cDict != nil {
C.ZSTD_freeCDict(p.cDict)
}
if p.dDict != nil {
C.ZSTD_freeDDict(p.dDict)
}
}
Loading

0 comments on commit 17aac71

Please sign in to comment.