Skip to content

Commit

Permalink
produce: adds hashCode-based default partitioner implementation (#25).
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed Jun 13, 2016
1 parent 53fa12e commit fe3926b
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
19 changes: 19 additions & 0 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,22 @@ func hashCode(s string) (hc int32) {
}
return
}

func kafkaAbs(i int32) int32 {
switch {
case i == -2147483648: // Integer.MIN_VALUE
return 0
case i < 0:
return i * -1
default:
return i
}
}

func hashCodePartition(key string, partitions int32) int32 {
if partitions <= 0 {
return -1
}

return kafkaAbs(hashCode(key)) % partitions
}
62 changes: 62 additions & 0 deletions produce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,68 @@ func TestHashCode(t *testing.T) {
}
}

func TestHashCodePartition(t *testing.T) {

data := []struct {
key string
partitions int32
expected int32
}{
{
key: "",
partitions: 0,
expected: -1,
},
{
key: "",
partitions: 1,
expected: 0,
},
{
key: "super-duper-key",
partitions: 1,
expected: 0,
},
{
key: "",
partitions: 1,
expected: 0,
},
{
key: "",
partitions: 2,
expected: 0,
},
{
key: "a",
partitions: 2,
expected: 1,
},
{
key: "b",
partitions: 2,
expected: 0,
},
{
key: "random",
partitions: 2,
expected: 1,
},
{
key: "random",
partitions: 5,
expected: 0,
},
}

for _, d := range data {
actual := hashCodePartition(d.key, d.partitions)
if actual != d.expected {
t.Errorf("expected %v but found %v for key %#v and %v partitions\n", d.expected, actual, d.key, d.partitions)
}
}
}

func TestProduceParseArgs(t *testing.T) {
configBefore := config
defer func() {
Expand Down

0 comments on commit fe3926b

Please sign in to comment.