diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index ec99aa6ba3c40..4c5b00defe4cc 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -1109,7 +1109,6 @@ type bufferedIterator struct { err error - decBuf []byte // The buffer for decoding the lengths. buf []byte // The buffer for a single entry. currLine []byte // the current line, this is the same as the buffer but sliced the the line size. currTs int64 @@ -1126,7 +1125,6 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *buffer reader: nil, // will be initialized later bufReader: nil, // will be initialized later pool: pool, - decBuf: make([]byte, binary.MaxVarintLen64), } } @@ -1228,7 +1226,6 @@ func (si *bufferedIterator) close() { si.buf = nil } si.origBytes = nil - si.decBuf = nil } func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline) iter.EntryIterator { diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 7ccfc0a570ebb..cd6f7981b8875 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "math/rand" + "sort" "strconv" "strings" "testing" @@ -42,6 +43,7 @@ var testEncoding = []Encoding{ var ( testBlockSize = 256 * 1024 testTargetSize = 1500 * 1024 + testBlockSizes = []int{64 * 1024, 256 * 1024, 512 * 1024} countExtractor = func() log.StreamSampleExtractor { ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) if err != nil { @@ -496,22 +498,41 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { } func TestChunkSize(t *testing.T) { - for _, f := range HeadBlockFmts { - for _, enc := range testEncoding { - t.Run(enc.String(), func(t *testing.T) { - t.Parallel() - c := NewMemChunk(enc, f, testBlockSize, testTargetSize) - inserted := fillChunk(c) - b, err := c.Bytes() - if err != nil { - t.Fatal(err) - } - t.Log("Chunk size", humanize.Bytes(uint64(len(b)))) - t.Log("characters ", humanize.Bytes(uint64(inserted))) - t.Log("Ratio", float64(inserted)/float64(len(b))) - }) + type res struct { + name string + size uint64 + compressedSize uint64 + ratio float64 + } + var result []res + for _, bs := range testBlockSizes { + for _, f := range HeadBlockFmts { + for _, enc := range testEncoding { + name := fmt.Sprintf("%s_%s", enc.String(), humanize.Bytes(uint64(bs))) + t.Run(name, func(t *testing.T) { + c := NewMemChunk(enc, f, bs, testTargetSize) + inserted := fillChunk(c) + b, err := c.Bytes() + if err != nil { + t.Fatal(err) + } + result = append(result, res{ + name: name, + size: uint64(inserted), + compressedSize: uint64(len(b)), + ratio: float64(inserted) / float64(len(b)), + }) + }) + } } } + sort.Slice(result, func(i, j int) bool { + return result[i].ratio > result[j].ratio + }) + fmt.Printf("%s\t%s\t%s\t%s\n", "name", "uncompressed", "compressed", "ratio") + for _, r := range result { + fmt.Printf("%s\t%s\t%s\t%f\n", r.name, humanize.Bytes(r.size), humanize.Bytes(r.compressedSize), r.ratio) + } } func TestChunkStats(t *testing.T) { @@ -662,78 +683,102 @@ func (nomatchPipeline) ProcessString(line string) (string, log.LabelsResult, boo } func BenchmarkRead(b *testing.B) { - for _, enc := range testEncoding { - b.Run(enc.String(), func(b *testing.B) { - chunks, size := generateData(enc, 5) - b.ResetTimer() - bytesRead := uint64(0) - now := time.Now() - for n := 0; n < b.N; n++ { - for _, c := range chunks { - // use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory - iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nomatchPipeline{}) - if err != nil { - panic(err) - } - for iterator.Next() { - _ = iterator.Entry() - } - if err := iterator.Close(); err != nil { - b.Fatal(err) + type res struct { + name string + speed float64 + } + result := []res{} + for _, bs := range testBlockSizes { + for _, enc := range testEncoding { + name := fmt.Sprintf("%s_%s", enc.String(), humanize.Bytes(uint64(bs))) + b.Run(name, func(b *testing.B) { + chunks, size := generateData(enc, 5, bs, testTargetSize) + b.ResetTimer() + bytesRead := uint64(0) + now := time.Now() + for n := 0; n < b.N; n++ { + for _, c := range chunks { + // use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory + iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nomatchPipeline{}) + if err != nil { + panic(err) + } + for iterator.Next() { + _ = iterator.Entry() + } + if err := iterator.Close(); err != nil { + b.Fatal(err) + } } + bytesRead += size } - bytesRead += size - } - b.Log("bytes per second ", humanize.Bytes(uint64(float64(bytesRead)/time.Since(now).Seconds()))) - b.Log("n=", b.N) - }) + result = append(result, res{ + name: name, + speed: float64(bytesRead) / time.Since(now).Seconds(), + }) + }) - b.Run(enc.String()+"_sample", func(b *testing.B) { - chunks, size := generateData(enc, 5) - b.ResetTimer() - bytesRead := uint64(0) - now := time.Now() - for n := 0; n < b.N; n++ { - for _, c := range chunks { - iterator := c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), countExtractor) - for iterator.Next() { - _ = iterator.Sample() - } - if err := iterator.Close(); err != nil { - b.Fatal(err) + name = fmt.Sprintf("sample_%s_%s", enc.String(), humanize.Bytes(uint64(bs))) + + b.Run(name, func(b *testing.B) { + chunks, size := generateData(enc, 5, bs, testTargetSize) + b.ResetTimer() + bytesRead := uint64(0) + now := time.Now() + for n := 0; n < b.N; n++ { + for _, c := range chunks { + iterator := c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), countExtractor) + for iterator.Next() { + _ = iterator.Sample() + } + if err := iterator.Close(); err != nil { + b.Fatal(err) + } } + bytesRead += size } - bytesRead += size - } - b.Log("bytes per second ", humanize.Bytes(uint64(float64(bytesRead)/time.Since(now).Seconds()))) - b.Log("n=", b.N) - }) + result = append(result, res{ + name: name, + speed: float64(bytesRead) / time.Since(now).Seconds(), + }) + }) + } + } + sort.Slice(result, func(i, j int) bool { + return result[i].speed > result[j].speed + }) + for _, r := range result { + fmt.Printf("%s: %.2f MB/s\n", r.name, r.speed/1024/1024) } } func BenchmarkBackwardIterator(b *testing.B) { - b.ReportAllocs() - c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize) - _ = fillChunk(c) - b.ResetTimer() - for n := 0; n < b.N; n++ { - iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noopStreamPipeline) - if err != nil { - panic(err) - } - for iterator.Next() { - _ = iterator.Entry() - } - if err := iterator.Close(); err != nil { - b.Fatal(err) - } + for _, bs := range testBlockSizes { + b.Run(humanize.Bytes(uint64(bs)), func(b *testing.B) { + b.ReportAllocs() + c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, bs, testTargetSize) + _ = fillChunk(c) + b.ResetTimer() + for n := 0; n < b.N; n++ { + iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noopStreamPipeline) + if err != nil { + panic(err) + } + for iterator.Next() { + _ = iterator.Entry() + } + if err := iterator.Close(); err != nil { + b.Fatal(err) + } + } + }) } } func TestGenerateDataSize(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { - chunks, size := generateData(enc, 50) + chunks, size := generateData(enc, 50, testBlockSize, testTargetSize) bytesRead := uint64(0) for _, c := range chunks { diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index 7618a8ae5eb80..c4e3de043dc16 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -15,14 +15,14 @@ func logprotoEntry(ts int64, line string) *logproto.Entry { } } -func generateData(enc Encoding, chunksCount int) ([]Chunk, uint64) { +func generateData(enc Encoding, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) { chunks := []Chunk{} i := int64(0) size := uint64(0) for n := 0; n < chunksCount; n++ { entry := logprotoEntry(0, testdata.LogString(0)) - c := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(enc, DefaultHeadBlockFmt, blockSize, targetSize) for c.SpaceFor(entry) { size += uint64(len(entry.Line)) _ = c.Append(entry)