diff --git a/pkg/kgo/partitioner.go b/pkg/kgo/partitioner.go index a2123698..4cb21896 100644 --- a/pkg/kgo/partitioner.go +++ b/pkg/kgo/partitioner.go @@ -487,7 +487,20 @@ func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher { } } -// SaramaHasher returns a PartitionerHasher using hashFn that mirrors how +// Deprecated: SaramaHasher is not compatible with Sarama's default partitioner +// and only remains to avoid re-keying records for existing users of this API. See +// [SaramaCompatHasher] for a correct partitioner. +func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher { + return func(key []byte, n int) int { + p := int(hashFn(key)) % n + if p < 0 { + p = -p + } + return p + } +} + +// SaramaCompatHasher returns a PartitionerHasher using hashFn that mirrors how // Sarama partitions after hashing data. // // Sarama has two differences from Kafka when partitioning: @@ -506,14 +519,14 @@ func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher { // // In short, to *exactly* match the Sarama defaults, use the following: // -// kgo.StickyKeyPartitioner(kgo.SaramaHasher(fnv.New32a())) -func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher { +// kgo.StickyKeyPartitioner(kgo.SaramaCompatHasher(fnv.New32a())) +func SaramaCompatHasher(hashFn func([]byte) uint32) PartitionerHasher { return func(key []byte, n int) int { - p := int(hashFn(key)) % n + p := int32(hashFn(key)) % int32(n) if p < 0 { p = -p } - return p + return int(p) } }