diff --git a/CHANGELOG.md b/CHANGELOG.md index 24dcf08e6..7569fa9f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Supported pool of encoders, which implement ResetableWriter interface + ## v3.97.0 * Added immutable range iterators from go1.23 into query stats to iterate over query phases and accessed tables without query stats object mutation diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index a1ca73ad9..8bd5ea17b 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -1,6 +1,7 @@ package topicwriterinternal import ( + "bytes" "compress/gzip" "fmt" "io" @@ -17,28 +18,107 @@ const ( codecUnknown = rawtopiccommon.CodecUNSPECIFIED ) -type EncoderMap struct { +type PublicResetableWriter interface { + io.WriteCloser + Reset(wr io.Writer) +} + +type encoderPool struct { + pool sync.Pool +} + +func (p *encoderPool) Get() PublicResetableWriter { + enc, _ := p.pool.Get().(PublicResetableWriter) + + return enc +} + +func (p *encoderPool) Put(wr PublicResetableWriter) { + p.pool.Put(wr) +} + +func newEncoderPool() *encoderPool { + return &encoderPool{ + pool: sync.Pool{}, + } +} + +type MultiEncoder struct { m map[rawtopiccommon.Codec]PublicCreateEncoderFunc + + bp xsync.Pool[bytes.Buffer] + ep map[rawtopiccommon.Codec]*encoderPool } -func NewEncoderMap() *EncoderMap { - return &EncoderMap{ - m: map[rawtopiccommon.Codec]PublicCreateEncoderFunc{ - rawtopiccommon.CodecRaw: func(writer io.Writer) (io.WriteCloser, error) { - return nopWriteCloser{writer}, nil - }, - rawtopiccommon.CodecGzip: func(writer io.Writer) (io.WriteCloser, error) { - return gzip.NewWriter(writer), nil +func NewMultiEncoder() *MultiEncoder { + me := &MultiEncoder{ + m: make(map[rawtopiccommon.Codec]PublicCreateEncoderFunc), + bp: xsync.Pool[bytes.Buffer]{ + New: func() *bytes.Buffer { + return &bytes.Buffer{} }, }, + ep: make(map[rawtopiccommon.Codec]*encoderPool), } + + me.AddEncoder(rawtopiccommon.CodecRaw, func(writer io.Writer) (io.WriteCloser, error) { + return nopWriteCloser{writer}, nil + }) + me.AddEncoder(rawtopiccommon.CodecGzip, func(writer io.Writer) (io.WriteCloser, error) { + return gzip.NewWriter(writer), nil + }) + + return me } -func (e *EncoderMap) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) { +func (e *MultiEncoder) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) { e.m[codec] = creator + e.ep[codec] = newEncoderPool() } -func (e *EncoderMap) CreateLazyEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) { +func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, source io.Reader) (int, error) { + buf := e.bp.GetOrNew() + defer e.bp.Put(buf) + + buf.Reset() + if _, err := buf.ReadFrom(source); err != nil { + return 0, err + } + + return e.EncodeBytes(codec, target, buf.Bytes()) +} + +func (e *MultiEncoder) EncodeBytes(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { + enc, err := e.createEncodeWriter(codec, target) + if err != nil { + return 0, err + } + + n, err := enc.Write(data) + if err == nil { + err = enc.Close() + } + if err != nil { + return 0, err + } + + if resetableEnc, ok := enc.(PublicResetableWriter); ok { + e.ep[codec].Put(resetableEnc) + } + + return n, nil +} + +func (e *MultiEncoder) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) { + if ePool, ok := e.ep[codec]; ok { + wr := ePool.Get() + if wr != nil { + wr.Reset(target) + + return wr, nil + } + } + if encoderCreator, ok := e.m[codec]; ok { return encoderCreator(target) } @@ -46,7 +126,7 @@ func (e *EncoderMap) CreateLazyEncodeWriter(codec rawtopiccommon.Codec, target i return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected codec '%v' for encode message", codec))) } -func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs { +func (e *MultiEncoder) GetSupportedCodecs() rawtopiccommon.SupportedCodecs { res := make(rawtopiccommon.SupportedCodecs, 0, len(e.m)) for codec := range e.m { res = append(res, codec) @@ -55,7 +135,7 @@ func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs { return res } -func (e *EncoderMap) IsSupported(codec rawtopiccommon.Codec) bool { +func (e *MultiEncoder) IsSupported(codec rawtopiccommon.Codec) bool { _, ok := e.m[codec] return ok @@ -73,7 +153,7 @@ func (nopWriteCloser) Close() error { // EncoderSelector not thread safe type EncoderSelector struct { - m *EncoderMap + m *MultiEncoder tracer *trace.Topic writerReconnectorID string @@ -87,7 +167,7 @@ type EncoderSelector struct { } func NewEncoderSelector( - m *EncoderMap, + m *MultiEncoder, allowedCodecs rawtopiccommon.SupportedCodecs, parallelCompressors int, tracer *trace.Topic, diff --git a/internal/topic/topicwriterinternal/encoders_test.go b/internal/topic/topicwriterinternal/encoders_test.go index 63b929068..b2bf7bd97 100644 --- a/internal/topic/topicwriterinternal/encoders_test.go +++ b/internal/topic/topicwriterinternal/encoders_test.go @@ -2,6 +2,9 @@ package topicwriterinternal import ( "bytes" + "compress/gzip" + "fmt" + "io" "strings" "testing" @@ -20,7 +23,7 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) { }) t.Run("One", func(t *testing.T) { s := NewEncoderSelector( - NewEncoderMap(), + NewMultiEncoder(), rawtopiccommon.SupportedCodecs{rawtopiccommon.CodecRaw}, 1, &trace.Topic{}, @@ -206,3 +209,89 @@ func TestCompressMessages(t *testing.T) { require.Error(t, cacheMessages(messages, rawtopiccommon.CodecGzip, parallelCount)) }) } + +func TestMultiEncoder(t *testing.T) { + decompressGzip := func(rd io.Reader) string { + gzreader, err := gzip.NewReader(rd) + require.NoError(t, err) + + decompressedMsg, err := io.ReadAll(gzreader) + require.NoError(t, err) + + return string(decompressedMsg) + } + + t.Run("BuffersPool", func(t *testing.T) { + testMultiEncoder := NewMultiEncoder() + + buf := &bytes.Buffer{} + for i := 0; i < 50; i++ { + testMsg := []byte(fmt.Sprintf("test_data_%d", i)) + + buf.Reset() + _, err := testMultiEncoder.Encode(rawtopiccommon.CodecGzip, buf, bytes.NewReader(testMsg)) + require.NoError(t, err) + + require.Equal(t, string(testMsg), decompressGzip(buf)) + } + }) + + t.Run("NotResetableWriter", func(t *testing.T) { + testMultiEncoder := NewMultiEncoder() + require.Len(t, testMultiEncoder.ep, 2) + + buf := &bytes.Buffer{} + _, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecRaw, buf, []byte("test_data")) + require.NoError(t, err) + require.Equal(t, "test_data", buf.String()) + }) + + t.Run("ResetableWriterCustom", func(t *testing.T) { + testMultiEncoder := NewMultiEncoder() + + customCodecCode := rawtopiccommon.Codec(10001) + testMultiEncoder.AddEncoder(customCodecCode, func(writer io.Writer) (io.WriteCloser, error) { + return gzip.NewWriter(writer), nil + }) + require.Len(t, testMultiEncoder.ep, 3) + + buf := &bytes.Buffer{} + _, err := testMultiEncoder.EncodeBytes(customCodecCode, buf, []byte("test_data_1")) + require.NoError(t, err) + require.Equal(t, "test_data_1", decompressGzip(buf)) + + buf.Reset() + _, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) + require.NoError(t, err) + require.Equal(t, "test_data_2", decompressGzip(buf)) + }) + + t.Run("ResetableWriter", func(t *testing.T) { + testMultiEncoder := NewMultiEncoder() + + buf := &bytes.Buffer{} + _, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_1")) + require.NoError(t, err) + require.Equal(t, "test_data_1", decompressGzip(buf)) + + buf.Reset() + _, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) + require.NoError(t, err) + require.Equal(t, "test_data_2", decompressGzip(buf)) + }) + + t.Run("ResetableWriterManyMessages", func(t *testing.T) { + testMultiEncoder := NewMultiEncoder() + + buf := &bytes.Buffer{} + for i := 0; i < 50; i++ { + testMsg := []byte(fmt.Sprintf("test_data_%d", i)) + + buf.Reset() + _, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, testMsg) + require.NoError(t, err) + + require.Equal(t, string(testMsg), decompressGzip(buf)) + } + }) +} diff --git a/internal/topic/topicwriterinternal/message.go b/internal/topic/topicwriterinternal/message.go index 00c17be3f..0e82532fe 100644 --- a/internal/topic/topicwriterinternal/message.go +++ b/internal/topic/topicwriterinternal/message.go @@ -67,7 +67,7 @@ type messageWithDataContent struct { bufCodec rawtopiccommon.Codec bufEncoded bytes.Buffer rawBuf bytes.Buffer - encoders *EncoderMap + encoders *MultiEncoder BufUncompressedSize int } @@ -111,18 +111,7 @@ func (m *messageWithDataContent) encodeRawContent(codec rawtopiccommon.Codec) ([ m.bufEncoded.Reset() - writer, err := m.encoders.CreateLazyEncodeWriter(codec, &m.bufEncoded) - if err != nil { - return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( - "ydb: failed create encoder for message, codec '%v': %w", - codec, - err, - ))) - } - _, err = writer.Write(m.rawBuf.Bytes()) - if err == nil { - err = writer.Close() - } + _, err := m.encoders.EncodeBytes(codec, &m.bufEncoded, m.rawBuf.Bytes()) if err != nil { return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed to compress message, codec '%v': %w", @@ -157,19 +146,12 @@ func (m *messageWithDataContent) readDataToTargetCodec(codec rawtopiccommon.Code m.bufCodec = codec m.bufEncoded.Reset() - encoder, err := m.encoders.CreateLazyEncodeWriter(codec, &m.bufEncoded) - if err != nil { - return err - } - reader := m.Data if reader == nil { reader = &bytes.Reader{} } - bytesCount, err := io.Copy(encoder, reader) - if err == nil { - err = encoder.Close() - } + + bytesCount, err := m.encoders.Encode(codec, &m.bufEncoded, reader) if err != nil { return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed compress message with codec '%v': %w", @@ -177,7 +159,7 @@ func (m *messageWithDataContent) readDataToTargetCodec(codec rawtopiccommon.Code err, ))) } - m.BufUncompressedSize = int(bytesCount) + m.BufUncompressedSize = bytesCount m.Data = nil return nil @@ -218,7 +200,7 @@ func (m *messageWithDataContent) getEncodedBytes(codec rawtopiccommon.Codec) ([] func newMessageDataWithContent( message PublicMessage, //nolint:gocritic - encoders *EncoderMap, + encoders *MultiEncoder, ) messageWithDataContent { return messageWithDataContent{ PublicMessage: message, diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index d0e182a05..a7632c4a4 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -124,7 +124,7 @@ type WriterReconnector struct { semaphore *semaphore.Weighted firstInitResponseProcessedChan empty.Chan lastSeqNo int64 - encodersMap *EncoderMap + encodersMap *MultiEncoder initDoneCh empty.Chan initInfo InitialInfo m xsync.RWMutex @@ -156,7 +156,7 @@ func newWriterReconnectorStopped( queue: newMessageQueue(), lastSeqNo: -1, firstInitResponseProcessedChan: make(empty.Chan), - encodersMap: NewEncoderMap(), + encodersMap: NewMultiEncoder(), writerInstanceID: writerInstanceID.String(), retrySettings: cfg.RetrySettings, } @@ -760,11 +760,11 @@ func createRawMessageData( return res, err } -func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *EncoderMap, +func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, multiEncoder *MultiEncoder, serverCodecs rawtopiccommon.SupportedCodecs, ) rawtopiccommon.SupportedCodecs { if forceCodec != rawtopiccommon.CodecUNSPECIFIED { - if serverCodecs.AllowedByCodecsList(forceCodec) && encoderMap.IsSupported(forceCodec) { + if serverCodecs.AllowedByCodecsList(forceCodec) && multiEncoder.IsSupported(forceCodec) { return rawtopiccommon.SupportedCodecs{forceCodec} } @@ -779,7 +779,7 @@ func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *Encoder res := make(rawtopiccommon.SupportedCodecs, 0, len(serverCodecs)) for _, codec := range serverCodecs { - if encoderMap.IsSupported(codec) { + if multiEncoder.IsSupported(codec) { res = append(res, codec) } } diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index dc89dcafb..405265399 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -26,7 +26,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" ) -var testCommonEncoders = NewEncoderMap() +var testCommonEncoders = NewMultiEncoder() func TestWriterImpl_AutoSeq(t *testing.T) { t.Run("OK", func(t *testing.T) { @@ -829,7 +829,7 @@ func TestSplitMessagesByBufCodec(t *testing.T) { func TestCalculateAllowedCodecs(t *testing.T) { customCodecSupported := rawtopiccommon.Codec(rawtopiccommon.CodecCustomerFirst) customCodecUnsupported := rawtopiccommon.Codec(rawtopiccommon.CodecCustomerFirst + 1) - encoders := NewEncoderMap() + encoders := NewMultiEncoder() encoders.AddEncoder(customCodecSupported, func(writer io.Writer) (io.WriteCloser, error) { return nil, errors.New("test") }) diff --git a/internal/topic/topicwriterinternal/writer_single_stream.go b/internal/topic/topicwriterinternal/writer_single_stream.go index d254aa075..6dbb3803c 100644 --- a/internal/topic/topicwriterinternal/writer_single_stream.go +++ b/internal/topic/topicwriterinternal/writer_single_stream.go @@ -23,7 +23,7 @@ type SingleStreamWriterConfig struct { stream RawTopicWriterStream queue *messageQueue - encodersMap *EncoderMap + encodersMap *MultiEncoder getLastSeqNum bool reconnectorInstanceID string } @@ -32,7 +32,7 @@ func newSingleStreamWriterConfig( common WritersCommonConfig, //nolint:gocritic stream RawTopicWriterStream, queue *messageQueue, - encodersMap *EncoderMap, + encodersMap *MultiEncoder, getLastSeqNum bool, reconnectorID string, ) SingleStreamWriterConfig { diff --git a/topic/topicoptions/topicoptions_writer.go b/topic/topicoptions/topicoptions_writer.go index 86d4b9c57..d74befa4e 100644 --- a/topic/topicoptions/topicoptions_writer.go +++ b/topic/topicoptions/topicoptions_writer.go @@ -19,8 +19,14 @@ type WriteSessionMetadata map[string]string // CreateEncoderFunc for create message decoders type CreateEncoderFunc = topicwriterinternal.PublicCreateEncoderFunc +// ResettableWriter is able to reset a nested writer between uses. +type ResetableWriter = topicwriterinternal.PublicResetableWriter + // WithWriterAddEncoder add custom codec implementation to writer. // It allows to set custom codecs implementations for custom and internal codecs. +// +// If CreateEncoderFunc returns a writer implementing ResetableWriter, then the compression objects +// will be reused for this codec. This will reduce the load on the GC. func WithWriterAddEncoder(codec topictypes.Codec, f CreateEncoderFunc) WriterOption { return topicwriterinternal.WithAddEncoder(rawtopiccommon.Codec(codec), f) }