Skip to content

Commit

Permalink
Improves memchunk benchmark to account for block size. (#4645)
Browse files Browse the repository at this point in the history
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Nov 12, 2021
1 parent 0ee476b commit 11071d4
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 76 deletions.
3 changes: 0 additions & 3 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,6 @@ type bufferedIterator struct {

err error

decBuf []byte // The buffer for decoding the lengths.
buf []byte // The buffer for a single entry.
currLine []byte // the current line, this is the same as the buffer but sliced the the line size.
currTs int64
Expand All @@ -1126,7 +1125,6 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *buffer
reader: nil, // will be initialized later
bufReader: nil, // will be initialized later
pool: pool,
decBuf: make([]byte, binary.MaxVarintLen64),
}
}

Expand Down Expand Up @@ -1228,7 +1226,6 @@ func (si *bufferedIterator) close() {
si.buf = nil
}
si.origBytes = nil
si.decBuf = nil
}

func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline) iter.EntryIterator {
Expand Down
187 changes: 116 additions & 71 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math"
"math/rand"
"sort"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -42,6 +43,7 @@ var testEncoding = []Encoding{
var (
testBlockSize = 256 * 1024
testTargetSize = 1500 * 1024
testBlockSizes = []int{64 * 1024, 256 * 1024, 512 * 1024}
countExtractor = func() log.StreamSampleExtractor {
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
if err != nil {
Expand Down Expand Up @@ -496,22 +498,41 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) {
}

func TestChunkSize(t *testing.T) {
for _, f := range HeadBlockFmts {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
c := NewMemChunk(enc, f, testBlockSize, testTargetSize)
inserted := fillChunk(c)
b, err := c.Bytes()
if err != nil {
t.Fatal(err)
}
t.Log("Chunk size", humanize.Bytes(uint64(len(b))))
t.Log("characters ", humanize.Bytes(uint64(inserted)))
t.Log("Ratio", float64(inserted)/float64(len(b)))
})
type res struct {
name string
size uint64
compressedSize uint64
ratio float64
}
var result []res
for _, bs := range testBlockSizes {
for _, f := range HeadBlockFmts {
for _, enc := range testEncoding {
name := fmt.Sprintf("%s_%s", enc.String(), humanize.Bytes(uint64(bs)))
t.Run(name, func(t *testing.T) {
c := NewMemChunk(enc, f, bs, testTargetSize)
inserted := fillChunk(c)
b, err := c.Bytes()
if err != nil {
t.Fatal(err)
}
result = append(result, res{
name: name,
size: uint64(inserted),
compressedSize: uint64(len(b)),
ratio: float64(inserted) / float64(len(b)),
})
})
}
}
}
sort.Slice(result, func(i, j int) bool {
return result[i].ratio > result[j].ratio
})
fmt.Printf("%s\t%s\t%s\t%s\n", "name", "uncompressed", "compressed", "ratio")
for _, r := range result {
fmt.Printf("%s\t%s\t%s\t%f\n", r.name, humanize.Bytes(r.size), humanize.Bytes(r.compressedSize), r.ratio)
}
}

func TestChunkStats(t *testing.T) {
Expand Down Expand Up @@ -662,78 +683,102 @@ func (nomatchPipeline) ProcessString(line string) (string, log.LabelsResult, boo
}

func BenchmarkRead(b *testing.B) {
for _, enc := range testEncoding {
b.Run(enc.String(), func(b *testing.B) {
chunks, size := generateData(enc, 5)
b.ResetTimer()
bytesRead := uint64(0)
now := time.Now()
for n := 0; n < b.N; n++ {
for _, c := range chunks {
// use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nomatchPipeline{})
if err != nil {
panic(err)
}
for iterator.Next() {
_ = iterator.Entry()
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
type res struct {
name string
speed float64
}
result := []res{}
for _, bs := range testBlockSizes {
for _, enc := range testEncoding {
name := fmt.Sprintf("%s_%s", enc.String(), humanize.Bytes(uint64(bs)))
b.Run(name, func(b *testing.B) {
chunks, size := generateData(enc, 5, bs, testTargetSize)
b.ResetTimer()
bytesRead := uint64(0)
now := time.Now()
for n := 0; n < b.N; n++ {
for _, c := range chunks {
// use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nomatchPipeline{})
if err != nil {
panic(err)
}
for iterator.Next() {
_ = iterator.Entry()
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
}
bytesRead += size
}
bytesRead += size
}
b.Log("bytes per second ", humanize.Bytes(uint64(float64(bytesRead)/time.Since(now).Seconds())))
b.Log("n=", b.N)
})
result = append(result, res{
name: name,
speed: float64(bytesRead) / time.Since(now).Seconds(),
})
})

b.Run(enc.String()+"_sample", func(b *testing.B) {
chunks, size := generateData(enc, 5)
b.ResetTimer()
bytesRead := uint64(0)
now := time.Now()
for n := 0; n < b.N; n++ {
for _, c := range chunks {
iterator := c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), countExtractor)
for iterator.Next() {
_ = iterator.Sample()
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
name = fmt.Sprintf("sample_%s_%s", enc.String(), humanize.Bytes(uint64(bs)))

b.Run(name, func(b *testing.B) {
chunks, size := generateData(enc, 5, bs, testTargetSize)
b.ResetTimer()
bytesRead := uint64(0)
now := time.Now()
for n := 0; n < b.N; n++ {
for _, c := range chunks {
iterator := c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), countExtractor)
for iterator.Next() {
_ = iterator.Sample()
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
}
bytesRead += size
}
bytesRead += size
}
b.Log("bytes per second ", humanize.Bytes(uint64(float64(bytesRead)/time.Since(now).Seconds())))
b.Log("n=", b.N)
})
result = append(result, res{
name: name,
speed: float64(bytesRead) / time.Since(now).Seconds(),
})
})
}
}
sort.Slice(result, func(i, j int) bool {
return result[i].speed > result[j].speed
})
for _, r := range result {
fmt.Printf("%s: %.2f MB/s\n", r.name, r.speed/1024/1024)
}
}

func BenchmarkBackwardIterator(b *testing.B) {
b.ReportAllocs()
c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
_ = fillChunk(c)
b.ResetTimer()
for n := 0; n < b.N; n++ {
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noopStreamPipeline)
if err != nil {
panic(err)
}
for iterator.Next() {
_ = iterator.Entry()
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
for _, bs := range testBlockSizes {
b.Run(humanize.Bytes(uint64(bs)), func(b *testing.B) {
b.ReportAllocs()
c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, bs, testTargetSize)
_ = fillChunk(c)
b.ResetTimer()
for n := 0; n < b.N; n++ {
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noopStreamPipeline)
if err != nil {
panic(err)
}
for iterator.Next() {
_ = iterator.Entry()
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
}
})
}
}

func TestGenerateDataSize(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
chunks, size := generateData(enc, 50)
chunks, size := generateData(enc, 50, testBlockSize, testTargetSize)

bytesRead := uint64(0)
for _, c := range chunks {
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunkenc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ func logprotoEntry(ts int64, line string) *logproto.Entry {
}
}

func generateData(enc Encoding, chunksCount int) ([]Chunk, uint64) {
func generateData(enc Encoding, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) {
chunks := []Chunk{}
i := int64(0)
size := uint64(0)

for n := 0; n < chunksCount; n++ {
entry := logprotoEntry(0, testdata.LogString(0))
c := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(enc, DefaultHeadBlockFmt, blockSize, targetSize)
for c.SpaceFor(entry) {
size += uint64(len(entry.Line))
_ = c.Append(entry)
Expand Down

0 comments on commit 11071d4

Please sign in to comment.