Skip to content

Commit

Permalink
Merge pull request #622 from C-Pro/sarama-compat-partitioner
Browse files Browse the repository at this point in the history
sarama-compatible hasher
  • Loading branch information
twmb authored Jan 21, 2024
2 parents 3e6d9f9 + d30aefa commit 0b3766d
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions pkg/kgo/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,20 @@ func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher {
}
}

// SaramaHasher returns a PartitionerHasher using hashFn that mirrors how
// Deprecated: SaramaHasher is not compatible with Sarama's default partitioner
// and only remains to avoid re-keying records for existing users of this API. See
// [SaramaCompatHasher] for a correct partitioner.
func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher {
return func(key []byte, n int) int {
p := int(hashFn(key)) % n
if p < 0 {
p = -p
}
return p
}
}

// SaramaCompatHasher returns a PartitionerHasher using hashFn that mirrors how
// Sarama partitions after hashing data.
//
// Sarama has two differences from Kafka when partitioning:
Expand All @@ -506,14 +519,14 @@ func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher {
//
// In short, to *exactly* match the Sarama defaults, use the following:
//
// kgo.StickyKeyPartitioner(kgo.SaramaHasher(fnv.New32a()))
func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher {
// kgo.StickyKeyPartitioner(kgo.SaramaCompatHasher(fnv.New32a()))
func SaramaCompatHasher(hashFn func([]byte) uint32) PartitionerHasher {
return func(key []byte, n int) int {
p := int(hashFn(key)) % n
p := int32(hashFn(key)) % int32(n)
if p < 0 {
p = -p
}
return p
return int(p)
}
}

Expand Down

0 comments on commit 0b3766d

Please sign in to comment.