From 1dc3d40d51d2f4e5722c0fcf149570a7c18e432a Mon Sep 17 00:00:00 2001 From: Sergey Melekhin <sergey@melekhin.me> Date: Wed, 22 Nov 2023 17:49:18 +0700 Subject: [PATCH 1/2] sarama-compatible hasher --- pkg/kgo/partitioner.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/pkg/kgo/partitioner.go b/pkg/kgo/partitioner.go index a2123698..e8621841 100644 --- a/pkg/kgo/partitioner.go +++ b/pkg/kgo/partitioner.go @@ -487,7 +487,21 @@ func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher { } } -// SaramaHasher returns a PartitionerHasher using hashFn that mirrors how +// SaramaHasher is not compatible with Sarama's default default partitioner. +// If you need sarama compatibility use SaramaCompatHasher instead. +// This function is left as is to provide compatibility with older versions of +// this library. +func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher { + return func(key []byte, n int) int { + p := int(hashFn(key)) % n + if p < 0 { + p = -p + } + return p + } +} + +// SaramaCompatHasher returns a PartitionerHasher using hashFn that mirrors how // Sarama partitions after hashing data. // // Sarama has two differences from Kafka when partitioning: @@ -506,14 +520,14 @@ func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher { // // In short, to *exactly* match the Sarama defaults, use the following: // -// kgo.StickyKeyPartitioner(kgo.SaramaHasher(fnv.New32a())) -func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher { +// kgo.StickyKeyPartitioner(kgo.SaramaCompatHasher(fnv.New32a())) +func SaramaCompatHasher(hashFn func([]byte) uint32) PartitionerHasher { return func(key []byte, n int) int { - p := int(hashFn(key)) % n + p := int32(hashFn(key)) % int32(n) if p < 0 { p = -p } - return p + return int(p) } } From d30aefaaada702124b401736d7bdcec6ead3af0f Mon Sep 17 00:00:00 2001 From: Sergey Melekhin <sergey@melekhin.me> Date: Mon, 4 Dec 2023 09:20:02 +0700 Subject: [PATCH 2/2] change comment on SaramaHasher --- pkg/kgo/partitioner.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/kgo/partitioner.go b/pkg/kgo/partitioner.go index e8621841..4cb21896 100644 --- a/pkg/kgo/partitioner.go +++ b/pkg/kgo/partitioner.go @@ -487,10 +487,9 @@ func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher { } } -// SaramaHasher is not compatible with Sarama's default default partitioner. -// If you need sarama compatibility use SaramaCompatHasher instead. -// This function is left as is to provide compatibility with older versions of -// this library. +// Deprecated: SaramaHasher is not compatible with Sarama's default partitioner +// and only remains to avoid re-keying records for existing users of this API. See +// [SaramaCompatHasher] for a correct partitioner. func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher { return func(key []byte, n int) int { p := int(hashFn(key)) % n