Skip to content

Commit

Permalink
Reverts flush buffer pooling. (#3210)
Browse files Browse the repository at this point in the history
Currently this requires more work, as the cache is asynchronously using the buffer when flushing chunks.

We still benefits from other improvements like computing the right size for a chunk.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jan 21, 2021
1 parent ba0df61 commit e74eb54
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 12 deletions.
10 changes: 1 addition & 9 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,6 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
metric := labelsBuilder.Labels()

wireChunks := make([]chunk.Chunk, len(cs))
buffers := make([]*bytes.Buffer, len(cs))
defer func() {
for j := range buffers {
chunksBufferPool.Put(buffers[j])
}
}()

// use anonymous function to make lock releasing simpler.
err = func() error {
Expand All @@ -349,13 +343,11 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
)

chunkSize := c.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header
buffer := chunksBufferPool.Get(chunkSize)
start := time.Now()
if err := ch.EncodeTo(buffer); err != nil {
if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkSize))); err != nil {
return err
}
chunkEncodeTime.Observe(time.Since(start).Seconds())
buffers[j] = buffer
wireChunks[j] = ch
}
return nil
Expand Down
24 changes: 23 additions & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,25 @@ func Benchmark_FlushLoop(b *testing.B) {
}
}

func Test_Flush(t *testing.T) {
var (
store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil)
lbs = makeRandomLabels()
ctx = user.InjectOrgID(context.Background(), "foo")
)
store.onPut = func(ctx context.Context, chunks []chunk.Chunk) error {
for _, c := range chunks {
buf, err := c.Encoded()
require.Nil(t, err)
if err := c.Decode(chunk.NewDecodeContext(), buf); err != nil {
return err
}
}
return nil
}
require.NoError(t, ing.flushChunks(ctx, 0, lbs, buildChunkDecs(t), &sync.RWMutex{}))
}

func buildChunkDecs(t testing.TB) []*chunkDesc {
res := make([]*chunkDesc, 10)
for i := range res {
Expand Down Expand Up @@ -227,6 +246,7 @@ type testStore struct {
mtx sync.Mutex
// Chunks keyed by userID.
chunks map[string][]chunk.Chunk
onPut func(ctx context.Context, chunks []chunk.Chunk) error
}

// Note: the ingester New() function creates it's own WAL first which we then override if specified.
Expand Down Expand Up @@ -275,7 +295,9 @@ func defaultIngesterTestConfig(t testing.TB) Config {
func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
s.mtx.Lock()
defer s.mtx.Unlock()

if s.onPut != nil {
return s.onPut(ctx, chunks)
}
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/pool/bytesbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func (p *BufferPool) Get(sz int) *bytes.Buffer {
}
b := p.buckets[i].Get()
if b == nil {
b = bytes.NewBuffer(make([]byte, bktSize))
b = bytes.NewBuffer(make([]byte, 0, bktSize))
}
buf := b.(*bytes.Buffer)
buf.Reset()
return b.(*bytes.Buffer)
}
return bytes.NewBuffer(make([]byte, sz))
return bytes.NewBuffer(make([]byte, 0, sz))
}

// Put adds a byte buffer to the right bucket in the pool.
Expand Down
17 changes: 17 additions & 0 deletions pkg/util/pool/bytesbuffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pool

import (
"testing"

"github.com/stretchr/testify/require"
)

func Test_ZeroBuffer(t *testing.T) {
p := NewBuffer(2, 10, 2)
require.Equal(t, 0, p.Get(1).Len())
require.Equal(t, 0, p.Get(1).Len())
require.Equal(t, 0, p.Get(2).Len())
require.Equal(t, 0, p.Get(2).Len())
require.Equal(t, 0, p.Get(20).Len())
require.Equal(t, 0, p.Get(20).Len())
}

0 comments on commit e74eb54

Please sign in to comment.