-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpartitioners_test.go
89 lines (78 loc) · 2.59 KB
/
partitioners_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package partitioners
import (
"hash/fnv"
"testing"
"github.com/IBM/sarama"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
"github.com/twmb/franz-go/pkg/kgo"
)
// fnv32a returns the FNV-1a hash of a byte slice.
// The same hash sarama and segmentio use:
// https://github.com/IBM/sarama/blob/d09a287f30bdf73b1c9f53c3f7a4b2ecc2bd41d3/partitioner.go#L178
// https://github.com/segmentio/kafka-go/blob/c6378c391a970fc19c1dbd664016f09c6a73ea8d/balancer.go#L114
func fnv32a(b []byte) uint32 {
h := fnv.New32a()
h.Reset()
h.Write(b)
return h.Sum32()
}
func SaramaCompatHasher(hashFn func([]byte) uint32) kgo.PartitionerHasher {
return func(key []byte, n int) int {
p := int32(hashFn(key)) % int32(n)
if p < 0 {
p = -p
}
return int(p)
}
}
func TestPartitioner(t *testing.T) {
// this is the default in Sarama:
// https://github.com/IBM/sarama/blob/d09a287f30bdf73b1c9f53c3f7a4b2ecc2bd41d3/config.go#L529C27-L529C45
saramaP := sarama.NewHashPartitioner("anytopic")
// This is "sarama-compatible" from segmentio:
// https://github.com/segmentio/kafka-go/blob/c6378c391a970fc19c1dbd664016f09c6a73ea8d/balancer.go#L127
segmentioP := &kafka.Hash{Hasher: nil}
// Franz declares this "sarama-compatible" but is not.
// https://github.com/twmb/franz-go/blob/a6d10d4ad93528f7c2e8ed735a40bb1bd8a03c8a/pkg/kgo/partitioner.go#L509
franzP := kgo.StickyKeyPartitioner(kgo.SaramaHasher(fnv32a)).ForTopic("anytopic")
// With this modified hasher it works as expected.
franzPmod := kgo.StickyKeyPartitioner(SaramaCompatHasher(fnv32a)).ForTopic("anytopic")
for n := 2; n < 25; n++ {
for i := 0; i < 100; i++ {
key := uuid.NewString()
partitions := make([]int, n)
for j := 0; j < n; j++ {
partitions[j] = j
}
pseg := segmentioP.Balance(kafka.Message{Key: []byte(key)}, partitions...)
pfrz := franzP.Partition(
&kgo.Record{
Key: []byte(key),
},
n,
)
pfrzmod := franzPmod.Partition(
&kgo.Record{
Key: []byte(key),
},
n,
)
psar, _ := saramaP.Partition(&sarama.ProducerMessage{
Key: sarama.StringEncoder(key),
}, int32(n))
// Sarama and segmentio are always equal.
if pseg != int(psar) {
t.Errorf("key %q, partitions %d, segmentio %d, sarama %d", key, n, pseg, psar)
}
// But franz gives different partition number.
if int(psar) != pfrz {
t.Errorf("key %q, partitions %d, sarama %d, franz %d", key, n, psar, pfrz)
}
// With the modified hasher franz gives the same partition number.
if int(psar) != pfrzmod {
t.Errorf("key %q, partitions %d, sarama %d, franz (modified) %d", key, n, psar, pfrzmod)
}
}
}
}