Skip to content

Commit

Permalink
Merge pull request #1040 from ClickHouse/compressor_etc
Browse files Browse the repository at this point in the history
perf(compressor): reduce memory by using new compression code
  • Loading branch information
SpencerTorres authored Feb 13, 2025
2 parents 4cdb83a + 65a3012 commit aadb7ee
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 104 deletions.
11 changes: 4 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ type Client struct {

// compressor performs block compression,
// see encodeBlock.
compressor *compress.Writer
compression proto.Compression
compressionMethod compress.Method
compressor *compress.Writer
compression proto.Compression

settings []Setting
}
Expand Down Expand Up @@ -491,7 +490,6 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {
}

var (
compressor = compress.NewWriterWithLevel(compress.Level(opt.CompressionLevel))
compression proto.Compression
compressionMethod compress.Method
)
Expand Down Expand Up @@ -525,9 +523,8 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {

readTimeout: opt.ReadTimeout,

compression: compression,
compressionMethod: compressionMethod,
compressor: compressor,
compression: compression,
compressor: compress.NewWriter(compress.Level(opt.CompressionLevel), compressionMethod),

version: ver,
protocolVersion: opt.ProtocolVersion,
Expand Down
48 changes: 8 additions & 40 deletions compress/compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func TestCompress(t *testing.T) {
for i := range MethodValues() {
m := MethodValues()[i]
t.Run(m.String(), func(t *testing.T) {
w := NewWriter()
require.NoError(t, w.Compress(m, data))
w := NewWriter(LevelZero, m)
require.NoError(t, w.Compress(data))

gold.Bytes(t, w.Data, "data_compressed_"+strings.ToLower(m.String()))

Expand Down Expand Up @@ -77,39 +77,6 @@ func TestCompress(t *testing.T) {
}
}

func TestCompressWithMethod(t *testing.T) {
data := []byte(strings.Repeat("Hello!\n", 25))
gold.Bytes(t, data, "data_raw")

for i := range MethodValues() {
m := MethodValues()[i]
t.Run(m.String(), func(t *testing.T) {
// Create a writer that only supports this method.
w := NewWriterWithMethods(0, m)
require.NoError(t, w.Compress(m, data))

// Using none should also work
require.NoError(t, w.Compress(None, data))

// Using a different method should fail.
nextMethod := Method((int(m) + 1) % NumMethods)
if nextMethod == None {
nextMethod = LZ4
}
require.Errorf(t, w.Compress(nextMethod, data), "writer was not configured to accept method: %s", nextMethod.String())
})
}

// Create a writer that only supports 2 methods.
t.Run("LZ4+ZSTD", func(t *testing.T) {
w := NewWriterWithMethods(0, LZ4, ZSTD)
require.NoError(t, w.Compress(None, data), "none method should always be accepted")
require.NoError(t, w.Compress(ZSTD, data))
require.NoError(t, w.Compress(LZ4, data))
require.Errorf(t, w.Compress(LZ4HC, data), "writer was not configured to accept method: LZ4HC")
})
}

func BenchmarkWriter_Compress(b *testing.B) {
// Highly compressible data.
data := bytes.Repeat([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, 1800)
Expand All @@ -120,15 +87,16 @@ func BenchmarkWriter_Compress(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(len(data)))

w := NewWriter()
w := NewWriter(LevelZero, m)

// First round to warmup.
if err := w.Compress(m, data); err != nil {
if err := w.Compress(data); err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := w.Compress(m, data); err != nil {
if err := w.Compress(data); err != nil {
b.Fatal(err)
}
}
Expand All @@ -153,8 +121,8 @@ func BenchmarkReader_Read(b *testing.B) {
for i := range MethodValues() {
m := MethodValues()[i]
b.Run(m.String(), func(b *testing.B) {
w := NewWriter()
if err := w.Compress(m, data); err != nil {
w := NewWriter(LevelZero, m)
if err := w.Compress(data); err != nil {
b.Fatal(err)
}
b.ReportAllocs()
Expand Down
8 changes: 4 additions & 4 deletions compress/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ func FuzzWriter_Compress(f *testing.F) {
f.Add([]byte{})
f.Add([]byte{1, 2, 3, 4, 5})
f.Fuzz(func(t *testing.T, data []byte) {
w := NewWriter()
require.NoError(t, w.Compress(LZ4, data))
w := NewWriter(LevelZero, LZ4)
require.NoError(t, w.Compress(data))

r := NewReader(bytes.NewReader(w.Data))
out := make([]byte, len(data))
Expand All @@ -32,8 +32,8 @@ func FuzzReader_Read(f *testing.F) {
[]byte("Hello, world!"),
{1, 2, 3, 4, 5},
} {
w := NewWriter()
require.NoError(f, w.Compress(LZ4, data))
w := NewWriter(LevelZero, LZ4)
require.NoError(f, w.Compress(data))
f.Add(w.Data)
}

Expand Down
81 changes: 30 additions & 51 deletions compress/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,32 @@ import (
)

const (
CompressionLevelLZ4HCDefault Level = 9
CompressionLevelLZ4HCMax Level = 12
LevelZero Level = 0
LevelLZ4HCDefault Level = 9
LevelLZ4HCMax Level = 12
)

// Writer encodes compressed blocks.
type Writer struct {
Data []byte

methods [NumMethods]bool // methods supported by this writer
lz4 *lz4.Compressor
lz4hc *lz4.CompressorHC
zstd *zstd.Encoder
method Method

lz4 *lz4.Compressor
lz4hc *lz4.CompressorHC
zstd *zstd.Encoder
}

// Compress buf into Data.
func (w *Writer) Compress(m Method, buf []byte) error {
if !w.methods[m] {
return errors.Errorf("writer was not configured to accept method: %s", m.String())
}

func (w *Writer) Compress(buf []byte) error {
maxSize := lz4.CompressBlockBound(len(buf))
w.Data = append(w.Data[:0], make([]byte, maxSize+headerSize)...)
_ = w.Data[:headerSize]
w.Data[hMethod] = byte(methodTable[m])
w.Data[hMethod] = byte(methodTable[w.method])

var n int

switch m {
switch w.method {
case LZ4:
compressedSize, err := w.lz4.CompressBlock(buf, w.Data[headerSize:])
if err != nil {
Expand Down Expand Up @@ -69,20 +67,25 @@ func (w *Writer) Compress(m Method, buf []byte) error {
return nil
}

// NewWriterWithMethods creates a new Writer with the specified compression level that supports only the specified methods.
func NewWriterWithMethods(l Level, m ...Method) *Writer {
var methods [NumMethods]bool
methods[None] = true // None is always supported
for _, method := range m {
methods[method] = true
}

// NewWriter creates a new Writer with the specified compression level that supports the specified method.
func NewWriter(l Level, m Method) *Writer {
var err error
var zstdWriter *zstd.Encoder
var lz4Writer *lz4.Compressor
var lz4hcWriter *lz4.CompressorHC

if methods[ZSTD] {
switch m {
case LZ4:
lz4Writer = &lz4.Compressor{}
case LZ4HC:
levelLZ4HC := l
if levelLZ4HC == 0 {
levelLZ4HC = LevelLZ4HCDefault
} else {
levelLZ4HC = Level(math.Min(float64(levelLZ4HC), float64(LevelLZ4HCMax)))
}
lz4hcWriter = &lz4.CompressorHC{Level: lz4.CompressionLevel(1 << (8 + levelLZ4HC))}
case ZSTD:
zstdWriter, err = zstd.NewWriter(nil,
zstd.WithEncoderLevel(zstd.SpeedDefault),
zstd.WithEncoderConcurrency(1),
Expand All @@ -91,37 +94,13 @@ func NewWriterWithMethods(l Level, m ...Method) *Writer {
if err != nil {
panic(err)
}
}

if methods[LZ4] {
lz4Writer = &lz4.Compressor{}
}

if methods[LZ4HC] {
// handle level for LZ4HC
levelLZ4HC := l
if levelLZ4HC == 0 {
levelLZ4HC = CompressionLevelLZ4HCDefault
} else {
levelLZ4HC = Level(math.Min(float64(levelLZ4HC), float64(CompressionLevelLZ4HCMax)))
}
lz4hcWriter = &lz4.CompressorHC{Level: lz4.CompressionLevel(1 << (8 + levelLZ4HC))}
default:
}

return &Writer{
methods: methods,
lz4: lz4Writer,
lz4hc: lz4hcWriter,
zstd: zstdWriter,
method: m,
lz4: lz4Writer,
lz4hc: lz4hcWriter,
zstd: zstdWriter,
}
}

// NewWriterWithLevel creates a new Writer with the specified compression level that supports all methods.
func NewWriterWithLevel(l Level) *Writer {
return NewWriterWithMethods(l, MethodValues()...)
}

// NewWriter creates a new Writer with compression level 0 that supports all methods.
func NewWriter() *Writer {
return NewWriterWithLevel(0)
}
2 changes: 1 addition & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (c *Client) encodeBlock(ctx context.Context, tableName string, input []prot
// See "Compressible" method of server or client code for reference.
if c.compression == proto.CompressionEnabled {
data := buf.Buf[start:]
if err := c.compressor.Compress(c.compressionMethod, data); err != nil {
if err := c.compressor.Compress(data); err != nil {
rerr = errors.Wrap(err, "compress")
return
}
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (s *Server) handle(conn net.Conn) error {
Revision: s.ver,
},
tz: time.UTC,
compressor: compress.NewWriter(),
compressor: compress.NewWriter(compress.LevelZero, compress.None),
}
return sConn.Handle()
}
Expand Down

0 comments on commit aadb7ee

Please sign in to comment.