From e74eb549baafbb5fd58fe1e105b8527b7694594d Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 21 Jan 2021 17:10:51 +0100 Subject: [PATCH] Reverts flush buffer pooling. (#3210) Currently this requires more work, as the cache is asynchronously using the buffer when flushing chunks. We still benefits from other improvements like computing the right size for a chunk. Signed-off-by: Cyril Tovena --- pkg/ingester/flush.go | 10 +--------- pkg/ingester/flush_test.go | 24 +++++++++++++++++++++++- pkg/util/pool/bytesbuffer.go | 4 ++-- pkg/util/pool/bytesbuffer_test.go | 17 +++++++++++++++++ 4 files changed, 43 insertions(+), 12 deletions(-) create mode 100644 pkg/util/pool/bytesbuffer_test.go diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index f40d00211b532..8322f8745c606 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -323,12 +323,6 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP metric := labelsBuilder.Labels() wireChunks := make([]chunk.Chunk, len(cs)) - buffers := make([]*bytes.Buffer, len(cs)) - defer func() { - for j := range buffers { - chunksBufferPool.Put(buffers[j]) - } - }() // use anonymous function to make lock releasing simpler. err = func() error { @@ -349,13 +343,11 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP ) chunkSize := c.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header - buffer := chunksBufferPool.Get(chunkSize) start := time.Now() - if err := ch.EncodeTo(buffer); err != nil { + if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkSize))); err != nil { return err } chunkEncodeTime.Observe(time.Since(start).Seconds()) - buffers[j] = buffer wireChunks[j] = ch } return nil diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 6655cc2f3373a..5918237b62836 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -97,6 +97,25 @@ func Benchmark_FlushLoop(b *testing.B) { } } +func Test_Flush(t *testing.T) { + var ( + store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil) + lbs = makeRandomLabels() + ctx = user.InjectOrgID(context.Background(), "foo") + ) + store.onPut = func(ctx context.Context, chunks []chunk.Chunk) error { + for _, c := range chunks { + buf, err := c.Encoded() + require.Nil(t, err) + if err := c.Decode(chunk.NewDecodeContext(), buf); err != nil { + return err + } + } + return nil + } + require.NoError(t, ing.flushChunks(ctx, 0, lbs, buildChunkDecs(t), &sync.RWMutex{})) +} + func buildChunkDecs(t testing.TB) []*chunkDesc { res := make([]*chunkDesc, 10) for i := range res { @@ -227,6 +246,7 @@ type testStore struct { mtx sync.Mutex // Chunks keyed by userID. chunks map[string][]chunk.Chunk + onPut func(ctx context.Context, chunks []chunk.Chunk) error } // Note: the ingester New() function creates it's own WAL first which we then override if specified. @@ -275,7 +295,9 @@ func defaultIngesterTestConfig(t testing.TB) Config { func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error { s.mtx.Lock() defer s.mtx.Unlock() - + if s.onPut != nil { + return s.onPut(ctx, chunks) + } userID, err := user.ExtractOrgID(ctx) if err != nil { return err diff --git a/pkg/util/pool/bytesbuffer.go b/pkg/util/pool/bytesbuffer.go index 5e1da7b8bb366..0a20a4fc1b7e1 100644 --- a/pkg/util/pool/bytesbuffer.go +++ b/pkg/util/pool/bytesbuffer.go @@ -44,13 +44,13 @@ func (p *BufferPool) Get(sz int) *bytes.Buffer { } b := p.buckets[i].Get() if b == nil { - b = bytes.NewBuffer(make([]byte, bktSize)) + b = bytes.NewBuffer(make([]byte, 0, bktSize)) } buf := b.(*bytes.Buffer) buf.Reset() return b.(*bytes.Buffer) } - return bytes.NewBuffer(make([]byte, sz)) + return bytes.NewBuffer(make([]byte, 0, sz)) } // Put adds a byte buffer to the right bucket in the pool. diff --git a/pkg/util/pool/bytesbuffer_test.go b/pkg/util/pool/bytesbuffer_test.go new file mode 100644 index 0000000000000..0fa955d01bcd2 --- /dev/null +++ b/pkg/util/pool/bytesbuffer_test.go @@ -0,0 +1,17 @@ +package pool + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_ZeroBuffer(t *testing.T) { + p := NewBuffer(2, 10, 2) + require.Equal(t, 0, p.Get(1).Len()) + require.Equal(t, 0, p.Get(1).Len()) + require.Equal(t, 0, p.Get(2).Len()) + require.Equal(t, 0, p.Get(2).Len()) + require.Equal(t, 0, p.Get(20).Len()) + require.Equal(t, 0, p.Get(20).Len()) +}