Skip to content

Commit

Permalink
Added benchmark for using kgo.FetchMaxPartitionBytes
Browse files Browse the repository at this point in the history
  • Loading branch information
sbuliarca committed Mar 29, 2024
1 parent 351e7fa commit 46a6744
Show file tree
Hide file tree
Showing 7 changed files with 508 additions and 0 deletions.
10 changes: 10 additions & 0 deletions examples/bench/bench-fetch-max-partition-bytes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Benchmark for using kgo.FetchMaxPartitionBytes

Running the benchmark:
1. Run: ```go test -bench . -benchmem -count 15 -cpu=1 > testrun_normal.txt```
2. Uncomment ```//kgo.FetchMaxPartitionBytes(10*1024*1024),``` on line 53 in ./client_benchmark_test.go
3. Run ```go test -bench . -benchmem -count 15 -cpu=1 > testrun_with_FetchMaxPartitionBytes10MB.txt```
4. Run ```go install golang.org/x/perf/cmd/benchstat@latest```
5. Run ``` benchstat testrun_normal.txt testrun_with_FetchMaxPartitionBytes10MB.txt > benchstat_results.txt```

In the results you can see that there is more than 300% in the memory allocation.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
goos: darwin
goarch: arm64
pkg: github.com/utilitywarehouse/franz-go/examples/bench/bench-fetch-max-partition-bytes
│ testrun_normal.txt │ testrun_with_FetchMaxPartitionBytes10MB.txt │
│ sec/op │ sec/op vs base │
PollRecords 3.396 ± 1% 3.698 ± 1% +8.89% (p=0.000 n=15)

│ testrun_normal.txt │ testrun_with_FetchMaxPartitionBytes10MB.txt │
│ B/op │ B/op vs base │
PollRecords 126.6Mi ± 0% 625.3Mi ± 0% +393.82% (p=0.000 n=15)

│ testrun_normal.txt │ testrun_with_FetchMaxPartitionBytes10MB.txt │
│ allocs/op │ allocs/op vs base │
PollRecords 13.75k ± 0% 61.91k ± 0% +350.37% (p=0.000 n=15)
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package benchmark_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/modules/redpanda"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"

"crypto/rand"
"github.com/google/uuid"
"github.com/testcontainers/testcontainers-go"
"io"
"log"
)

const (
totalEntries = 500000
)

func BenchmarkPollRecords(b *testing.B) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
b.Cleanup(cancel)

testcontainers.Logger = log.New(io.Discard, "", log.LstdFlags)

ctnr, err := redpanda.RunContainer(ctx, redpanda.WithAutoCreateTopics())
require.NoError(b, err)

broker, err := ctnr.KafkaSeedBroker(ctx)
require.NoError(b, err)

b.Cleanup(func() {
if err := ctnr.Terminate(context.Background()); err != nil {
b.Logf("failed terminating container: %v", err)
}
})

topicName := newRandomName("test-topic")
require.NoError(b, populateTopic(ctx, b, broker, topicName))

b.ResetTimer()
consumer, err := kgo.NewClient(
kgo.SeedBrokers(broker),
kgo.ConsumerGroup(newRandomName("test-consumer")),
kgo.ConsumeTopics(topicName),
kgo.BlockRebalanceOnPoll(),
// use 10MB
//kgo.FetchMaxPartitionBytes(10*1024*1024),
)

require.NoError(b, err)
defer consumer.CloseAllowingRebalance()

consumedTotal := 0

for i := 0; i < b.N; i++ {
fetches := consumer.PollRecords(ctx, 1000)
require.NoError(b, fetches.Err0())
consumedTotal += len(fetches.Records())
if consumedTotal >= totalEntries {
return
}
}
}

func populateTopic(ctx context.Context, t *testing.B, broker string, topicName string) error {
t.Log("Start topic population")
producer, err := kgo.NewClient(
kgo.SeedBrokers(broker),
kgo.DefaultProduceTopic(topicName),
)
require.NoError(t, err)
defer producer.Close()

adm := kadm.NewClient(producer)
partitions := 10
tr, err := adm.CreateTopic(ctx, int32(partitions), 1, nil, topicName)
require.NoError(t, tr.Err)
require.NoError(t, err)

data := make([]byte, 10240) // 10KB per message
_, err = rand.Read(data)
require.NoError(t, err)

batchLen := 5000
for it := 0; it < totalEntries/batchLen; it++ {
recs := make([]*kgo.Record, batchLen)
for i := 0; i < batchLen; i++ {
recs[i] = &kgo.Record{
Value: data,
Key: []byte(fmt.Sprintf("key-%d", i%partitions)),
}
}
require.NoError(t, producer.ProduceSync(ctx, recs...).FirstErr())
}
return err
}

func newRandomName(baseName string) string {
return baseName + "-" + uuid.NewString()
}
68 changes: 68 additions & 0 deletions examples/bench/bench-fetch-max-partition-bytes/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
module github.com/utilitywarehouse/franz-go/examples/bench/bench-fetch-max-partition-bytes

go 1.22.1

require (
github.com/google/uuid v1.6.0
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.29.1
github.com/testcontainers/testcontainers-go/modules/redpanda v0.29.1
github.com/twmb/franz-go v1.16.1
github.com/twmb/franz-go/pkg/kadm v1.11.0
)

require (
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Microsoft/hcsshim v0.11.4 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/containerd/containerd v1.7.12 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/docker v25.0.3+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.19 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
golang.org/x/mod v0.16.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 46a6744

Please sign in to comment.